Re: flink-benchmarks使用求助

2020-07-13 文章 Congxian Qiu
Hi zilong

之前没有使用 `-t max` 跑过,你可以分享一下你使用的全部命令么?我可以本地看看

Best,
Congxian


zilong xiao  于2020年7月14日周二 上午10:16写道:

> `-t max`之后出现的~ 改小并发后貌似没问题
>
> Congxian Qiu  于2020年7月13日周一 下午8:14写道:
>
> > Hi
> >
> > 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢?
> >
> > Best,
> > Congxian
> >
> >
> > zilong xiao  于2020年7月13日周一 下午2:32写道:
> >
> > > 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
> > > timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
> > >
> > > Congxian Qiu  于2020年7月10日周五 下午7:18写道:
> > >
> > > > Hi
> > > > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme
> 能跑出一个结果(csv
> > > > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
> > > >
> > > > [1] https://github.com/dataArtisans/flink-benchmarks
> > > > [2] http://openjdk.java.net/projects/code-tools/jmh/
> > > >
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > zilong xiao  于2020年7月10日周五 下午3:54写道:
> > > >
> > > > >
> > 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> > > > >
> > > >
> > >
> >
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Congxian Qiu
Hi

这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
另外你可以看下 tm log 看看有没有其他异常

Best,
Congxian


Yun Tang  于2020年7月14日周二 上午11:57写道:

> Hi Peihui
>
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> cause。
>
> [1]
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
>
> 祝好
> 唐云
> 
> From: Peihui He 
> Sent: Tuesday, July 14, 2020 10:42
> To: user-zh@flink.apache.org 
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>


Re: 退订

2020-07-13 文章 Congxian Qiu
Hi

退订需要发送邮件到  user-zh-unsubscr...@flink.apache.org

Best,
Congxian


成欢晴  于2020年7月14日周二 下午12:44写道:

> 退订
>
>
> | |
> chq19970719
> |
> |
> 邮箱:chq19970...@163.com
> |
>
> Signature is customized by Netease Mail Master


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-13 文章 Congxian Qiu
Hi

如果可以的话,建议先调用 RestClient 的 stop 等命令(这样可以在最后做一次 savepoint,或者 checkpoint -- 这个
FLINK-12619 想做),然后失败再使用 yarn 的 kill 命令,这样能够减少后续启动时的回放数据量

Best,
Congxian


zhisheng  于2020年7月14日周二 下午12:53写道:

> 如果是 on yarn 的话,也可以直接调用 yarn 的 kill 命令停止作业
>
> Jeff Zhang  于2020年7月11日周六 下午11:23写道:
>
> > Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient
> > api来做到的,对zeppelin感兴趣的话,可以参考这个视频
> >
> > https://www.bilibili.com/video/BV1Te411W73b?p=21
> >
> >
> > jianxu  于2020年7月11日周六 下午4:52写道:
> >
> > > Hi:
> > >
> > >
> >
> 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
> > > jobId)取消流任务。
> > > Flink源码可以看看 CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考
> > > https://github.com/todd5167/flink-spark-submiter
> > > 项目的任务提交部分,取消任务时构建ClusterClient即可。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > | |
> > > jianxu
> > > |
> > > |
> > > rjia...@163.com
> > > |
> > >
> > >
> > >
> > >
> > > 在2020年07月11日 16:19,Congxian Qiu 写道:
> > > Hi
> > >
> > > 如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn
> > > 的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道
> applicationId,另外你还需要知道
> > > flink 的 JobId,接下来就是调用 Flink 的接口了
> > >
> > > 如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > godfrey he  于2020年7月9日周四 上午10:08写道:
> > >
> > > 可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
> > > JobClient 可以 cancel 作业,获取 job status。
> > >
> > > [1]
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> > >
> > > Best,
> > > Godfrey
> > >
> > > Evan  于2020年7月9日周四 上午9:40写道:
> > >
> > > 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> > > API有没有提供类似的接口,调用后就能停止这个Stream作业呢?
> > >
> > >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


Re: State 0点清除的问题

2020-07-13 文章 Congxian Qiu
Hi
如果你需要精确的控制每天 0 点清除  state 的话,或许你可以考虑使用 processFunction[1], 然后自己使用 timer
实现相关逻辑

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/process_function.html
Best,
Congxian


ゞ野蠻遊戲χ  于2020年7月14日周二 下午1:10写道:

> 大家好:  
> 我想问下,在ProcessAllWindowFunction中,在每天的0点清除state如何清除?
>
>
> Thanks
> 嘉治


Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
没有使用窗口呢,就多表关联,涉及到流表join流表,流表join维表,group by 、topN等



--
Sent from: http://apache-flink.147419.n8.nabble.com/

State 0????????????

2020-07-13 文章 ?g???U?[????
   
ProcessAllWindowFunction0??state??


Thanks


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-13 文章 zhisheng
如果是 on yarn 的话,也可以直接调用 yarn 的 kill 命令停止作业

Jeff Zhang  于2020年7月11日周六 下午11:23写道:

> Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient
> api来做到的,对zeppelin感兴趣的话,可以参考这个视频
>
> https://www.bilibili.com/video/BV1Te411W73b?p=21
>
>
> jianxu  于2020年7月11日周六 下午4:52写道:
>
> > Hi:
> >
> >
> 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
> > jobId)取消流任务。
> > Flink源码可以看看 CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考
> > https://github.com/todd5167/flink-spark-submiter
> > 项目的任务提交部分,取消任务时构建ClusterClient即可。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > | |
> > jianxu
> > |
> > |
> > rjia...@163.com
> > |
> >
> >
> >
> >
> > 在2020年07月11日 16:19,Congxian Qiu 写道:
> > Hi
> >
> > 如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn
> > 的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道 applicationId,另外你还需要知道
> > flink 的 JobId,接下来就是调用 Flink 的接口了
> >
> > 如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
> >
> > Best,
> > Congxian
> >
> >
> > godfrey he  于2020年7月9日周四 上午10:08写道:
> >
> > 可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
> > JobClient 可以 cancel 作业,获取 job status。
> >
> > [1]
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> >
> > Best,
> > Godfrey
> >
> > Evan  于2020年7月9日周四 上午9:40写道:
> >
> > 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> > API有没有提供类似的接口,调用后就能停止这个Stream作业呢?
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>


退订

2020-07-13 文章 成欢晴
退订


| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

Re: (无主题)

2020-07-13 文章 Jingsong Li
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Tue, Jul 14, 2020 at 12:36 PM 成欢晴  wrote:

> 退订
>
>
> | |
> chq19970719
> |
> |
> 邮箱:chq19970...@163.com
> |
>
> Signature is customized by Netease Mail Master



-- 
Best, Jingsong Lee


(无主题)

2020-07-13 文章 成欢晴
退订


| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 zhisheng
有没有窗口啊?

Robin Zhang  于2020年7月14日周二 上午11:48写道:

> 
> 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
> 代码如下:
>tEnv.getConfig()
>  .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),
>
> Time.hours(maxIdleStateRetentionTime));
>
> 程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: flink on yarn日志问题

2020-07-13 文章 zhisheng
知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager 的日志(可以拼出路径),然后复制到本地去查看

Yangze Guo  于2020年7月14日周二 上午11:58写道:

> Hi, 王松
>
> 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。
>
> Best,
> Yangze Guo
>
> On Tue, Jul 14, 2020 at 8:26 AM 王松  wrote:
> >
> > 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
> >
> > Yangze Guo  于2020年7月13日周一 下午5:03写道:
> >
> > > 1.
> > >
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> > > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> > > >
> > > > 不好意思  怪我灭有描述清楚
> > > > 1 目前开启日志收集功能
> > > > 2 目前已是 per-job模式
> > > > 3 集群使用cdh flink.1.10
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > > > >Hi,
> > > > >
> > > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > > > >
> > > > >第二个问题,您可以尝试一下per-job mode [2][3]
> > > > >
> > > > >[1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > > > >[2]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > > > >[3]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > > > >
> > > > >
> > > > >Best,
> > > > >Yangze Guo
> > > > >
> > > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > > > >>
> > > > >> 请问一下两个问题
> > > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> > > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> > > 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > > > >>
> > >
>


退出邮件组

2020-07-13 文章 成欢晴




| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

回复:flink1.9状态及作业迁移

2020-07-13 文章 成欢晴
退订




| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月14日 12:15,Yun Tang 写道:
对于Flink本身机制不支持的场景,可以通过直接修改Checkpoint meta 
文件同时将meta以及data文件迁移到新HDFS集群也能做到,加载Checkpoint的具体代码可以参照Checkpoints#loadAndValidateCheckpoint
 [1],而存储Checkpoint的代码可以参照Checkpoints#storeCheckpointMetadata [2]


[1] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L124
[2] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L81


祝好
唐云

From: Dream-底限 
Sent: Tuesday, July 14, 2020 11:57
To: user-zh@flink.apache.org 
Subject: Re: flink1.9状态及作业迁移

hi、
请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗
》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Yun Tang  于2020年7月14日周二 上午11:54写道:

> Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。
>
> Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-5763
>
> 祝好
> 唐云
>
> 
> From: Dream-底限 
> Sent: Tuesday, July 14, 2020 11:07
> To: user-zh@flink.apache.org 
> Subject: flink1.9状态及作业迁移
>
> hi:
>
> flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?
>


Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
对于Flink本身机制不支持的场景,可以通过直接修改Checkpoint meta 
文件同时将meta以及data文件迁移到新HDFS集群也能做到,加载Checkpoint的具体代码可以参照Checkpoints#loadAndValidateCheckpoint
 [1],而存储Checkpoint的代码可以参照Checkpoints#storeCheckpointMetadata [2]


[1] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L124
[2] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L81


祝好
唐云

From: Dream-底限 
Sent: Tuesday, July 14, 2020 11:57
To: user-zh@flink.apache.org 
Subject: Re: flink1.9状态及作业迁移

hi、
请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗
》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Yun Tang  于2020年7月14日周二 上午11:54写道:

> Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。
>
> Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-5763
>
> 祝好
> 唐云
>
> 
> From: Dream-底限 
> Sent: Tuesday, July 14, 2020 11:07
> To: user-zh@flink.apache.org 
> Subject: flink1.9状态及作业迁移
>
> hi:
>
> flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?
>


Re: Re: flink on yarn日志问题

2020-07-13 文章 Yangze Guo
Hi, 王松

我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。

Best,
Yangze Guo

On Tue, Jul 14, 2020 at 8:26 AM 王松  wrote:
>
> 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
>
> Yangze Guo  于2020年7月13日周一 下午5:03写道:
>
> > 1.
> > 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> > >
> > > 不好意思  怪我灭有描述清楚
> > > 1 目前开启日志收集功能
> > > 2 目前已是 per-job模式
> > > 3 集群使用cdh flink.1.10
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > > >Hi,
> > > >
> > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > > >
> > > >第二个问题,您可以尝试一下per-job mode [2][3]
> > > >
> > > >[1]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > > >[2]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > > >[3]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > > >
> > > >
> > > >Best,
> > > >Yangze Guo
> > > >
> > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > > >>
> > > >> 请问一下两个问题
> > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> > 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > > >>
> >


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Yun Tang
Hi Peihui

你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
 cause。

[1] 
https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473


祝好
唐云

From: Peihui He 
Sent: Tuesday, July 14, 2020 10:42
To: user-zh@flink.apache.org 
Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

hello,

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示


Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

配置和1.9.2 一样:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
state.savepoints.dir: hdfs:///flink/savepoints/wc/
state.backend.incremental: true

代码上都有

env.enableCheckpointing(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));


  是1.10.0 需要做什么特别配置么?


Re: flink1.9状态及作业迁移

2020-07-13 文章 Dream-底限
hi、
请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗
》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Yun Tang  于2020年7月14日周二 上午11:54写道:

> Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。
>
> Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-5763
>
> 祝好
> 唐云
>
> 
> From: Dream-底限 
> Sent: Tuesday, July 14, 2020 11:07
> To: user-zh@flink.apache.org 
> Subject: flink1.9状态及作业迁移
>
> hi:
>
> flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?
>


Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。


[1] https://issues.apache.org/jira/browse/FLINK-5763

祝好
唐云


From: Dream-底限 
Sent: Tuesday, July 14, 2020 11:07
To: user-zh@flink.apache.org 
Subject: flink1.9状态及作业迁移

hi:
flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?


Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
 
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
代码如下:
   tEnv.getConfig()
 .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime), 

Time.hours(maxIdleStateRetentionTime));

程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi

那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 
在导入Kafka,之后再FlinkSQL 处理。

可爱的木兰


发件人: Benchao Li 
发送时间: 2020年7月14日 11:00
收件人: user-zh 
主题: Re: Flink SQL处理Array型的JSON

我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu  于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan  写道:
> >
> > 可爱的木兰
>
>

--

Best,
Benchao Li


回复: Flink SQL复杂JSON解析

2020-07-13 文章 hua mulan
Hi

那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 
在导入Kafka,之后再FlinkSQL 处理。

可爱的木兰

发件人: Benchao Li 
发送时间: 2020年7月8日 20:46
收件人: user-zh 
主题: Re: Flink SQL复杂JSON解析

看代码应该是支持复合类型的,你可以试下。

hua mulan  于2020年7月8日周三 下午8:34写道:

> 我试了下 Array里是基本元素可以CROSS JOIN
> UNNEST直接解开。如果Array里是Row、POJO、Tuple这种复合类型我就只能UDTF了是吧。
>
> 来自 Outlook
>
> 
> 发件人: Benchao Li 
> 发送时间: 2020年7月6日 22:35
> 收件人: user-zh 
> 主题: Re: Flink SQL复杂JSON解析
>
> 我理解最佳实践是第一种,先读出来array,再用table function展开成多行。
> 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> 王 outlook  于2020年7月6日周一 下午9:29写道:
>
> > 像如下这种JSON输入,
> >
> > {
> >   "id": 1,
> >   "many_names": [
> > {"name": "foo"},
> > {"name": "bar"}
> >   ]
> > }
> >
> > 输出表两行  id 1, name foo  |  id 1, name bar
> >
> > 最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
> > 还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。
> >
> >
> > 来自 Outlook
> >
>
>
> --
>
> Best,
> Benchao Li
>


--

Best,
Benchao Li


flink1.9状态及作业迁移

2020-07-13 文章 Dream-底限
hi:
flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?


flink cep 如何处理超时事件?

2020-07-13 文章 drewfranklin
Hello all.
 想请教下各位。
 
我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。
 
但是我定义pattern 后发现,我的这个没办法在一条事件数据上完成判定。必须借助和上一事件数据比较之后判断是不是超时。


想知道该如何定义pattern 能够,取到排序之后前后两个两个事件。

Re:回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
Hi,
感谢社区热心答疑!

















在 2020-07-14 11:00:18,"夏帅"  写道:
>你好,
>本质还是StreamingFileSink,所以目前只能append
>
>
>--
>发件人:Zhou Zach 
>发送时间:2020年7月14日(星期二) 10:56
>收件人:user-zh 
>主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录
>
>
>
>
>Hi Leonard,
>原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-14 09:56:00,"Leonard Xu"  写道:
>>Hi,
>>
>>> 在 2020年7月14日,09:52,Zhou Zach  写道:
>>> 
>   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>>
>>看下这个抽取出来的rowkey是否有重复的呢?
>>
>>祝好,
>>Leonard Xu


回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 夏帅
你好,
本质还是StreamingFileSink,所以目前只能append


--
发件人:Zhou Zach 
发送时间:2020年7月14日(星期二) 10:56
收件人:user-zh 
主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录




Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式














在 2020-07-14 09:56:00,"Leonard Xu"  写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach  写道:
>> 
   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
 cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu


Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Benchao Li
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu  于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan  写道:
> >
> > 可爱的木兰
>
>

-- 

Best,
Benchao Li


Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach



Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式














在 2020-07-14 09:56:00,"Leonard Xu"  写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach  写道:
>> 
   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
 cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu


回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hello,Leonard Xu

我这边JSON 不是

{
"id": 2,
"heap": [
{
"foo": 14,
"bar": "foo"

},
{
"foo": 16,
"bar": "bar"
}
],
}

而是直接一个Array

[
{
"foo": 14,
"bar": "foo"

},
{
"foo": 16,
"bar": "bar"
}
]

我发现DDL没法声明,SQL层面我不知道怎么做了。

可爱的木兰


发件人: Leonard Xu 
发送时间: 2020年7月14日 10:42
收件人: user-zh 
主题: Re: Flink SQL处理Array型的JSON

Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
 


> 在 2020年7月14日,10:34,hua mulan  写道:
>
> 可爱的木兰



Re: 【Flink Join内存问题】

2020-07-13 文章 admin
regular join会缓存两边流的所有数据,interval join只存一段时间内的,相比当然节省很大的状态存储

> 2020年7月13日 下午10:30,忝忝向仧 <153488...@qq.com> 写道:
> 
> Hi:
> 
> 
> interval join可以缓解key值过多问题么?
> interval join不也是计算某段时间范围内的join么,跟regular join相比,如何做到避免某个stream的key过多问题?
> 谢谢.
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
> <17626017...@163.com;
> 发送时间:2020年7月6日(星期一) 中午11:12
> 收件人:"user-zh" 
> 主题:Re: 【Flink Join内存问题】
> 
> 
> 
> regular join确实是这样,所以量大的话可以用interval join 、temporal join
> 
>  2020年7月5日 下午3:50,忝忝向仧 <153488...@qq.com 写道:
>  
>  Hi,all:
>  
>  我看源码里写到JoinedStreams:
>  也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom
>  那么有什么预防措施呢?
>  将key值多的一边进行打散?
>  
>  
>  Right now, the join is being evaluated in memory so you need to ensure 
> that the number
>  * of elements per key does not get too high. Otherwise the JVM might 
> crash.



自定义的sql connector在sql-cli中运行问题

2020-07-13 文章 admin
hi all,
我自定义了一个sql 
connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update 
statement.
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
Caused by: scala.MatchError: null
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.Option.map(Option.scala:146) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341)
 ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
好吧,感谢回答



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Peihui He
hello,

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示


Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

配置和1.9.2 一样:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
state.savepoints.dir: hdfs:///flink/savepoints/wc/
state.backend.incremental: true

代码上都有

env.enableCheckpointing(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));


  是1.10.0 需要做什么特别配置么?


Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Leonard Xu
Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
 


> 在 2020年7月14日,10:34,hua mulan  写道:
> 
> 可爱的木兰



Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 jindy_liu
本来还想尽最大可能的复用源码,看了下JdbcOutputFormat的源码实现,batch size是sql语句的个数据,kafka的batch
size是字节数,两个协调不好,两个各sink各自的时间阈值也同步不了。

我准备按你的说的方式,用RichFlatMapFunction,里面实现实现一个buffer。
等buffer达阈值或定时时间条件满足时,一次性手动调用JdbcOutputFormat(可以设置更大的buffer值)的writeRecord和flush;不满足的时候,RichFlatMapFunction里不输出元素;
这样kafka的batch sinka节奏应该就不用管了,两者的batch条件相互独立。
我自己初步看了下,应该可以?
初学者,望大佬提点,还有其它的注意事项要注意不?






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi

Kafka中的JSON结构是个Array例子如下。
[
 { "id": 1},
 { "id": 2}
]
读出来变成表的两行。Flink SQL层面最佳实践是什么?
如果没有办法是不是只能改JSON结构了。


可爱的木兰


flink1.9.1-消费kafka落pg库任务出错

2020-07-13 文章 nicygan
dear all:
  
我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
  详细如下:


1、部署方式:
flink on yarn ,pre-job,每个container 1024 M
jobmanager的jvmoption(默认的)  -Xms424m-Xmx424m


2、数据情况:
kafka数据,约1分钟1条,文本数据,每条数据都非常小。


3、任务情况:
很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
消费采用 FlinkKafkaConsumer
写库采用 JDBCAppendTableSink
并行度 1
checkpoint 2分钟一次,每次checkpoint约100ms
statebackend rocksdb


4、报错情况:
2020-07-10 11:51:54,237 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:51:54,421 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 ms).
2020-07-10 11:53:54,253 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:53:54,457 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 ms).
2020-07-10 11:55:54,246 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:55:54,402 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 ms).
2020-07-10 11:56:34,155 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
'flink-akka.actor.default-dispatcher-4673' produced an uncaught exception. 
Stopping the process...
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at 
akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
at 
akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
at 
akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
at 
akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
at akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
at 
akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
at 
akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
at 
akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
at akka.actor.Cell.sendMessage(ActorCell.scala:350)
at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

thanks all / by nicygan


Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
这可能是 connect API  的某个 bug 吧。 建议先用 DDL 。

Best,
Jark

On Tue, 14 Jul 2020 at 08:54, Hito Zhu  wrote:

> rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
> Rowtime rowtime = new Rowtime()
> .timestampsFromField("searchTime")
> .watermarksPeriodicBounded(5 * 1000);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
补充一下,kubernetes版本是1.18
Yvette zhai  于2020年7月13日周一 下午9:10写道:

> 1. 执行的脚本,产生的日志是:
> 2020-07-13 21:00:25,248 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: parallelism.default, 1
> 2020-07-13 21:00:25,252 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-07-13 21:00:25,344 INFO
>  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] -
> Could not load factory due to missing dependencies.
> 2020-07-13 21:00:26,136 INFO
>  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
> less than its min value 192.000mb (201326592 bytes), min value will be used
> instead
> 2020-07-13 21:00:26,154 INFO
>  org.apache.flink.kubernetes.utils.KubernetesUtils[] -
> Kubernetes deployment requires a fixed port. Configuration blob.server.port
> will be set to 6124
> 2020-07-13 21:00:26,154 INFO
>  org.apache.flink.kubernetes.utils.KubernetesUtils[] -
> Kubernetes deployment requires a fixed port. Configuration
> taskmanager.rpc.port will be set to 6122
> 2020-07-13 21:00:26,204 INFO
>  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
> less than its min value 192.000mb (201326592 bytes), min value will be used
> instead
> 2020-07-13 21:00:26,220 WARN
>  org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
> [] - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop
> Configuration ConfigMap.
> 2020-07-13 21:00:26,220 WARN
>  org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
> [] - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop
> Configuration ConfigMap.
> 2020-07-13 21:00:26,958 INFO
>  org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster k8s-session-1 successfully, JobManager Web Interface:
> http://172.16.5.175:8081
>
> 2. 查看 desrcibe 日志是:
> MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> "flink-config-k8s-session-1" not found
>
> 3. logs 日志是:
>
> Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath
> $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824
> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log
> -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j.properties
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> 1> /opt/flink/log/jobmanager.out 2> /opt/flink/log/jobmanager.err
>
> 4. kubectl get cm 可以看到
> NAME DATA   AGE
> flink-config-k8s-session-1   3  5m45s
>
> 麻烦大佬帮忙看看~是不是我的语句有问题还是缺什么文件~
> 我是直接官网下的包,没有改任何文件~
>
> Leonard Xu  于2020年7月13日周一 下午8:41写道:
>
>> Hi, zhai
>>
>> 可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao
>>
>> 祝好
>>
>> > 在 2020年7月13日,20:11,Yvette zhai  写道:
>> >
>> > 报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
>> > "flink-config-k8s-session-1" not found
>> >
>> >
>> > Leonard Xu  于2020年7月13日周一 下午8:03写道:
>> >
>> >> Hi, zhai
>> >>
>> >> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
>> >>
>> >> Best,
>> >> Leonard Xu
>> >>
>> >>> 在 2020年7月13日,19:59,Yvette zhai  写道:
>> >>>
>> >>> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
>> >>> 下载的flink-1.11.0-bin-scala_2.11.tgz
>> >>>
>> >>> 执行命令是
>> >>> ./bin/kubernetes-session.sh \
>> >>>-Dkubernetes.cluster-id=k8s-session-1 \
>> >>>-Dtaskmanager.memory.process.size=4096m \
>> >>>-Dkubernetes.taskmanager.cpu=2 \
>> >>>-Dtaskmanager.numberOfTaskSlots=4 \
>> >>>-Dresourcemanager.taskmanager-timeout=360 \
>> >>>-Dkubernetes.container.image=flink:1.11.0-scala_2.11
>> >>>
>> >>> 但是会报错,找不到configmap
>> >>>
>> >>>
>> >>> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
>> >>>
>> >>
>> >>
>>
>>


Re: flink-benchmarks使用求助

2020-07-13 文章 zilong xiao
`-t max`之后出现的~ 改小并发后貌似没问题

Congxian Qiu  于2020年7月13日周一 下午8:14写道:

> Hi
>
> 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢?
>
> Best,
> Congxian
>
>
> zilong xiao  于2020年7月13日周一 下午2:32写道:
>
> > 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
> > timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
> >
> > Congxian Qiu  于2020年7月10日周五 下午7:18写道:
> >
> > > Hi
> > > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv
> > > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
> > >
> > > [1] https://github.com/dataArtisans/flink-benchmarks
> > > [2] http://openjdk.java.net/projects/code-tools/jmh/
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > zilong xiao  于2020年7月10日周五 下午3:54写道:
> > >
> > > >
> 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> > > >
> > >
> >
>


Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Leonard Xu
Hi,

> 在 2020年7月14日,09:52,Zhou Zach  写道:
> 
>>>   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
>>> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,

看下这个抽取出来的rowkey是否有重复的呢?

祝好,
Leonard Xu

Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach






Hi, Leonard
我设置了 'connector.write.buffer-flush.interval' = ‘1s',然后重启运行程序,
再消息发送刚开始,比如说发送了4条,hive和hbase接收的消息都是4条,再消息发送48条的时候,我停止了producer,
再去查结果hbase是19条,hive是48条,如果说每1s钟flink查一下sink hbase 
buffer是不是到1mb,到了就sink,没到就不sink,但是这解释不了,为啥刚开始,hbase和hive接收到到数据是同步的,奇怪











在 2020-07-13 21:50:54,"Leonard Xu"  写道:
>Hi, Zhou
>
>
>>   'connector.write.buffer-flush.max-size' = '1mb',
>>   'connector.write.buffer-flush.interval' = ‘0s'
>
>(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 
>BufferredMutator 
>做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval
> 设置为 0s 
>时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval
> 设置成 1s 应该就能看到数据了。
>
>(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 
>所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 
>1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]
>
>
>Best,
>Leonard Xu
>[1] 
>http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674
> 
>
>
>
>> 在 2020年7月13日,21:09,Zhou Zach  写道:
>> 
>> 
>> 
>> flink订阅kafka消息,同时sink到hbase和hive中,
>> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
>> 
>> 
>> query:
>> streamTableEnv.executeSql(
>>  """
>>|
>>|CREATE TABLE hbase_table (
>>|rowkey VARCHAR,
>>|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>>|) WITH (
>>|'connector.type' = 'hbase',
>>|'connector.version' = '2.1.0',
>>|'connector.table-name' = 'ods:user_hbase6',
>>|'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>|'connector.zookeeper.znode.parent' = '/hbase',
>>|'connector.write.buffer-flush.max-size' = '1mb',
>>|'connector.write.buffer-flush.max-rows' = '1',
>>|'connector.write.buffer-flush.interval' = '0s'
>>|)
>>|""".stripMargin)
>> 
>>val statementSet = streamTableEnv.createStatementSet()
>>val insertHbase =
>>  """
>>|insert into hbase_table
>>|SELECT
>>|   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
>> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>>|   ROW(sex, age, created_time ) as cf
>>|FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
>> created_time from kafka_table)
>>|
>>|""".stripMargin
>> 
>>statementSet.addInsertSql(insertHbase)
>> 
>>val insertHive =
>>  """
>>|
>>|INSERT INTO odsCatalog.ods.hive_table
>>|SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'), 
>> DATE_FORMAT(created_time, 'HH')
>>|FROM kafka_table
>>|
>>|""".stripMargin
>>statementSet.addInsertSql(insertHive)
>> 
>> 
>>statementSet.execute()
>> 
>> 
>> 是因为参数'connector.write.buffer-flush.max-size' = 
>> '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
>> bytes) value but was: 1kb
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
>> bytes) value but was: 10b
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
>> bytes) value but was: 1
>> 
>> 
>> 
>> 
>> 
>> 
>> 并且,按照官网文档
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
>> 
>> 
>> 设置参数也不识别,报错:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find 
>> any factory for identifier 'hbase-2.1.0' that implements 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
>> 
>> 
>> 看了一下源码,
>> org.apache.flink.table.descriptors.HBaseValidator
>> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>>public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>>public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>>public static final String CONNECTOR_ZK_QUORUM = 
>> "connector.zookeeper.quorum";
>>public static final String CONNECTOR_ZK_NODE_PARENT = 
>> "connector.zookeeper.znode.parent";
>>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
>> "connector.write.buffer-flush.max-size";
>>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
>> "connector.write.buffer-flush.max-rows";
>>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
>> "connector.write.buffer-flush.interval";
>> 参数还是老参数
>


Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
Rowtime rowtime = new Rowtime()
.timestampsFromField("searchTime")
.watermarksPeriodicBounded(5 * 1000);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink on yarn日志问题

2020-07-13 文章 王松
我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。

Yangze Guo  于2020年7月13日周一 下午5:03写道:

> 1.
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
>
> Best,
> Yangze Guo
>
>
> On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> >
> > 不好意思  怪我灭有描述清楚
> > 1 目前开启日志收集功能
> > 2 目前已是 per-job模式
> > 3 集群使用cdh flink.1.10
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > >Hi,
> > >
> > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > >
> > >第二个问题,您可以尝试一下per-job mode [2][3]
> > >
> > >[1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > >[2]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > >[3]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > >
> > >
> > >Best,
> > >Yangze Guo
> > >
> > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > >>
> > >> 请问一下两个问题
> > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > >>
>


?????? ??Flink Join??????????

2020-07-13 文章 ????????
Hi:


interval joinkey?
interval join??join??regular 
join??stream??key?
.




----
??: 
   "user-zh"

<17626017...@163.com;
:2020??7??6??(??) 11:12
??:"user-zh"

Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Leonard Xu
Hi, Zhou


>   'connector.write.buffer-flush.max-size' = '1mb',
>   'connector.write.buffer-flush.interval' = ‘0s'

(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 
BufferredMutator 
做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval
 设置为 0s 
时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval
 设置成 1s 应该就能看到数据了。

(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 
所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 
1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]


Best,
Leonard Xu
[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674
 



> 在 2020年7月13日,21:09,Zhou Zach  写道:
> 
> 
> 
> flink订阅kafka消息,同时sink到hbase和hive中,
> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
> 
> 
> query:
> streamTableEnv.executeSql(
>  """
>|
>|CREATE TABLE hbase_table (
>|rowkey VARCHAR,
>|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>|) WITH (
>|'connector.type' = 'hbase',
>|'connector.version' = '2.1.0',
>|'connector.table-name' = 'ods:user_hbase6',
>|'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>|'connector.zookeeper.znode.parent' = '/hbase',
>|'connector.write.buffer-flush.max-size' = '1mb',
>|'connector.write.buffer-flush.max-rows' = '1',
>|'connector.write.buffer-flush.interval' = '0s'
>|)
>|""".stripMargin)
> 
>val statementSet = streamTableEnv.createStatementSet()
>val insertHbase =
>  """
>|insert into hbase_table
>|SELECT
>|   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>|   ROW(sex, age, created_time ) as cf
>|FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
> created_time from kafka_table)
>|
>|""".stripMargin
> 
>statementSet.addInsertSql(insertHbase)
> 
>val insertHive =
>  """
>|
>|INSERT INTO odsCatalog.ods.hive_table
>|SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'), 
> DATE_FORMAT(created_time, 'HH')
>|FROM kafka_table
>|
>|""".stripMargin
>statementSet.addInsertSql(insertHive)
> 
> 
>statementSet.execute()
> 
> 
> 是因为参数'connector.write.buffer-flush.max-size' = 
> '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 1kb
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 10b
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 1
> 
> 
> 
> 
> 
> 
> 并且,按照官网文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
> 
> 
> 设置参数也不识别,报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
> factory for identifier 'hbase-2.1.0' that implements 
> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
> 
> 
> 看了一下源码,
> org.apache.flink.table.descriptors.HBaseValidator
> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>public static final String CONNECTOR_ZK_QUORUM = 
> "connector.zookeeper.quorum";
>public static final String CONNECTOR_ZK_NODE_PARENT = 
> "connector.zookeeper.znode.parent";
>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
> "connector.write.buffer-flush.max-size";
>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
> "connector.write.buffer-flush.max-rows";
>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
> "connector.write.buffer-flush.interval";
> 参数还是老参数



flink state

2020-07-13 文章 Robert.Zhang
Hello,all
目前stream中遇到一个问题,
想使用一个全局的state 在所有的keyed stream中使用,或者global 
parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream 
operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽


Best regards

Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

TableEnvironment.executeSql() 和 StatementSet.execute()
提交的作业都是异步的,如果是在本地测试的话,不会等有最终结果才会推出。针对这个问题,1.12里准备引入 await 方法
[3],代码还在review中。

TableResult是用来描述一个statement执行的结果。对于SELECT和INSERT,TableResult中还包含了JobClient
[4]
用来操作对应的job,例如获取job状态,cancel作业,等待作业结束等。TableResult还可以collect方法拿到statement执行的schema和结果数据,例如
select/show的结果。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
[3] https://issues.apache.org/jira/browse/FLINK-18337
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey


小学生 <201782...@qq.com> 于2020年7月13日周一 下午9:12写道:

> 嗯嗯,尝试了,这下没问题了,想问下这个TableResult对象,设计的目的是啥呢,不是特别懂呢,谢谢!
>


回复: 使用Flink Array Field Type

2020-07-13 文章 叶贤勋
谢谢 Leonard的解答。刚刚也看到了这个jira单[1]


[1] https://issues.apache.org/jira/browse/FLINK-17847
| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2020年07月13日 20:50,Leonard Xu 写道:
Hi,

SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where 
CARDINALITY(arr) >= 5 这种方式防止数组访问越界。


祝好,
Leonard Xu

在 2020年7月13日,20:34,叶贤勋  写道:

test_array_string[0]



Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
??TableResult

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
你的源码中 new
Schema().field("searchTime",DataTypes.TIMESTAMP()).rowtime(rowtime);
里面的 rowtime 的定义能贴下吗?

On Mon, 13 Jul 2020 at 20:53, Hito Zhu  wrote:

> Hi Jark 异常信息如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field null does not exist
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85)
> at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
>
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
> at
>
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65)
> at
>
> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118)
> at scala.Option.map(Option.scala:146)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> 

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
1. 执行的脚本,产生的日志是:
2020-07-13 21:00:25,248 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: parallelism.default, 1
2020-07-13 21:00:25,252 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-07-13 21:00:25,344 INFO
 org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] -
Could not load factory due to missing dependencies.
2020-07-13 21:00:26,136 INFO
 org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
less than its min value 192.000mb (201326592 bytes), min value will be used
instead
2020-07-13 21:00:26,154 INFO
 org.apache.flink.kubernetes.utils.KubernetesUtils[] -
Kubernetes deployment requires a fixed port. Configuration blob.server.port
will be set to 6124
2020-07-13 21:00:26,154 INFO
 org.apache.flink.kubernetes.utils.KubernetesUtils[] -
Kubernetes deployment requires a fixed port. Configuration
taskmanager.rpc.port will be set to 6122
2020-07-13 21:00:26,204 INFO
 org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
less than its min value 192.000mb (201326592 bytes), min value will be used
instead
2020-07-13 21:00:26,220 WARN
 org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
[] - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop
Configuration ConfigMap.
2020-07-13 21:00:26,220 WARN
 org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
[] - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop
Configuration ConfigMap.
2020-07-13 21:00:26,958 INFO
 org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
flink session cluster k8s-session-1 successfully, JobManager Web Interface:
http://172.16.5.175:8081

2. 查看 desrcibe 日志是:
MountVolume.SetUp failed for volume "flink-config-volume" : configmap
"flink-config-k8s-session-1" not found

3. logs 日志是:

Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath
$FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824
-XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j.properties
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
1> /opt/flink/log/jobmanager.out 2> /opt/flink/log/jobmanager.err

4. kubectl get cm 可以看到
NAME DATA   AGE
flink-config-k8s-session-1   3  5m45s

麻烦大佬帮忙看看~是不是我的语句有问题还是缺什么文件~
我是直接官网下的包,没有改任何文件~

Leonard Xu  于2020年7月13日周一 下午8:41写道:

> Hi, zhai
>
> 可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao
>
> 祝好
>
> > 在 2020年7月13日,20:11,Yvette zhai  写道:
> >
> > 报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> > "flink-config-k8s-session-1" not found
> >
> >
> > Leonard Xu  于2020年7月13日周一 下午8:03写道:
> >
> >> Hi, zhai
> >>
> >> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
> >>
> >> Best,
> >> Leonard Xu
> >>
> >>> 在 2020年7月13日,19:59,Yvette zhai  写道:
> >>>
> >>> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
> >>> 下载的flink-1.11.0-bin-scala_2.11.tgz
> >>>
> >>> 执行命令是
> >>> ./bin/kubernetes-session.sh \
> >>>-Dkubernetes.cluster-id=k8s-session-1 \
> >>>-Dtaskmanager.memory.process.size=4096m \
> >>>-Dkubernetes.taskmanager.cpu=2 \
> >>>-Dtaskmanager.numberOfTaskSlots=4 \
> >>>-Dresourcemanager.taskmanager-timeout=360 \
> >>>-Dkubernetes.container.image=flink:1.11.0-scala_2.11
> >>>
> >>> 但是会报错,找不到configmap
> >>>
> >>>
> >>> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
> >>>
> >>
> >>
>
>


flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach


flink订阅kafka消息,同时sink到hbase和hive中,
当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条


query:
streamTableEnv.executeSql(
  """
|
|CREATE TABLE hbase_table (
|rowkey VARCHAR,
|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|) WITH (
|'connector.type' = 'hbase',
|'connector.version' = '2.1.0',
|'connector.table-name' = 'ods:user_hbase6',
|'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
|'connector.zookeeper.znode.parent' = '/hbase',
|'connector.write.buffer-flush.max-size' = '1mb',
|'connector.write.buffer-flush.max-rows' = '1',
|'connector.write.buffer-flush.interval' = '0s'
|)
|""".stripMargin)

val statementSet = streamTableEnv.createStatementSet()
val insertHbase =
  """
|insert into hbase_table
|SELECT
|   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
|   ROW(sex, age, created_time ) as cf
|FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
created_time from kafka_table)
|
|""".stripMargin

statementSet.addInsertSql(insertHbase)

val insertHive =
  """
|
|INSERT INTO odsCatalog.ods.hive_table
|SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'), 
DATE_FORMAT(created_time, 'HH')
|FROM kafka_table
|
|""".stripMargin
statementSet.addInsertSql(insertHive)


statementSet.execute()


是因为参数'connector.write.buffer-flush.max-size' = 
'1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 1kb
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 10b
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 1






并且,按照官网文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


设置参数也不识别,报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'hbase-2.1.0' that implements 
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.


看了一下源码,
org.apache.flink.table.descriptors.HBaseValidator
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
public static final String CONNECTOR_ZK_QUORUM = 
"connector.zookeeper.quorum";
public static final String CONNECTOR_ZK_NODE_PARENT = 
"connector.zookeeper.znode.parent";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
"connector.write.buffer-flush.max-size";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
"connector.write.buffer-flush.max-rows";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
"connector.write.buffer-flush.interval";
参数还是老参数

Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 Leonard Xu
Hi,知道了
source.execute_insert("g_source_tab”) 
返回的结果是一个TableResult对象,如果不显示地等待任务的执行,这个任务会直接返回,你试下这个

result.execute_insert("g_source_tab") \
.get_job_client() \
.get_job_execution_result() \
.result()

这是Flip-84引入的一个改动,为了更好地处理table程序的返回值。

祝好,
Leonard Xu

> 在 2020年7月13日,20:57,小学生 <201782...@qq.com> 写道:
> 
> 不像吧,这个是1.10版的,我执行这个程序很快就结束了,不会挂着。



Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
??1.10

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是
flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成
flink-sql-connector-kafka
后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下:

org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}

添加provided后就没有问题了。

最后附上正确的pom文件 (如 Jingsong
所说,也可以把flink-sql-connector-kafka、flink-json这些都在pom文件中去掉,直接将jar报放入lib中):




org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}



org.apache.flink
flink-json
${flink.version}


org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}




Jingsong Li  于2020年7月13日周一 下午4:35写道:

> Hi,
>
> 1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
>
> 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
> spi)
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> Best,
> Jingsong
>
> On Mon, Jul 13, 2020 at 4:04 PM 王松  wrote:
>
> > 你好本超,
> > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
> >
> > Benchao Li  于2020年7月13日周一 下午3:42写道:
> >
> > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> > > 当然,直接粗暴的放到lib下,也是可以的。
> > >
> > > Leonard Xu  于2020年7月13日周一 下午3:38写道:
> > >
> > > > Hi
> > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> > > >
> > > > 祝好
> > > >
> > > > > 在 2020年7月13日,15:28,王松  写道:
> > > > >
> > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > > > >
> > > > > 我机器上flink/lib下jar包如下:
> > > > > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41
> > flink-avro-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09
> > flink-csv-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > > > flink-dist_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09
> > flink-json-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > > > flink-shaded-zookeeper-3.4.14.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > > > flink-table_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > > > flink-table-blink_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> > > > log4j-1.2-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09
> > log4j-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09
> > log4j-core-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > > > > log4j-slf4j-impl-2.12.1.jar
> > > > >
> > > > > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> > > > >
> > > > >> Hi,
> > > > >> flink-connector-kafka_${scala.binary.version 和
> > > > >> flink-sql-connector-kafka_${scala.binary.version
> > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > > > >>
> > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > > > >>
> > > > >> 祝好
> > > > >> Leonard Xu
> > > > >>
> > > > >>> 在 2020年7月13日,14:42,王松  写道:
> > > > >>>
> > > > >>> @Leonard Xu,
> > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > > >>> =
> > > > >>> 
> > > > >>>   org.apache.flink
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> flink-sql-connector-kafka_${scala.binary.version}
> > > > >>>   ${flink.version}
> > > > >>>   
> > > > >>>
> > > > >>>
> > > > >>>   
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> 
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>
> > > > >> 
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>
> > > > >>>   
> > > > >>>   
> > > > >>>
> >  
> > > > >>>   
> > > > >>>   
> > > > >>> =
> > > > >>>
> > > > >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> > > > >>>
> > > >  Hi, 王松
> > > > 
> > > >  这个报错是pom中缺少了 Kafka SQL 

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
Hi Jark 异常信息如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field null does not exist
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85)
at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65)
at
org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:321)
at

回复: flink 1.11运算结果存mysql出错

2020-07-13 文章 Zhonghan Tang
有新数据进来吗,看起来和这个jira很像
https://issues.apache.org/jira/browse/FLINK-15262




在2020年07月13日 20:38,Leonard Xu 写道:
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka 
topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道:

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python 
*.py执行的。完整代码如下


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
id VARCHAR, 
alarm_id VARCHAR, 
trck_id VARCHAR


) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g', 
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json'
)
"""

sink="""
CREATE TABLE g_source_tab (
id VARCHAR, 
alarm_id VARCHAR,  
trck_id VARCHAR


) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
.select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")


Re: 使用Flink Array Field Type

2020-07-13 文章 Leonard Xu
Hi, 

SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where 
CARDINALITY(arr) >= 5 这种方式防止数组访问越界。


祝好,
Leonard Xu

> 在 2020年7月13日,20:34,叶贤勋  写道:
> 
> test_array_string[0]



Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
topic??,??flink1.10??insert_into

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Leonard Xu
Hi, zhai

可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao

祝好

> 在 2020年7月13日,20:11,Yvette zhai  写道:
> 
> 报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> "flink-config-k8s-session-1" not found
> 
> 
> Leonard Xu  于2020年7月13日周一 下午8:03写道:
> 
>> Hi, zhai
>> 
>> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
>> 
>> Best,
>> Leonard Xu
>> 
>>> 在 2020年7月13日,19:59,Yvette zhai  写道:
>>> 
>>> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
>>> 下载的flink-1.11.0-bin-scala_2.11.tgz
>>> 
>>> 执行命令是
>>> ./bin/kubernetes-session.sh \
>>>-Dkubernetes.cluster-id=k8s-session-1 \
>>>-Dtaskmanager.memory.process.size=4096m \
>>>-Dkubernetes.taskmanager.cpu=2 \
>>>-Dtaskmanager.numberOfTaskSlots=4 \
>>>-Dresourcemanager.taskmanager-timeout=360 \
>>>-Dkubernetes.container.image=flink:1.11.0-scala_2.11
>>> 
>>> 但是会报错,找不到configmap
>>> 
>>> 
>>> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
>>> 
>> 
>> 



Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 Leonard Xu
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka 
topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

> 在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道:
> 
> 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python
>  *.py执行的。完整代码如下
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment, 
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> source="""
> CREATE TABLE kafka_source_tab (
> id VARCHAR, 
> alarm_id VARCHAR, 
> trck_id VARCHAR
> 
> 
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alarm_test_g', 
> 'scan.startup.mode' = 'earliest-offset', 
> 'properties.bootstrap.servers' = '10.2.2.73:2181',
> 'properties.bootstrap.servers' = '10.2.2.73:9092',
> 'format' = 'json' 
> )
> """
> 
> sink="""
> CREATE TABLE g_source_tab (
> id VARCHAR,  
> alarm_id VARCHAR,  
> trck_id VARCHAR
> 
> 
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
> 'table-name' = 'g', 
> 'username' = 'root',
> 'password' = '123456t',
> 'sink.buffer-flush.interval' = '1s'
> )
> """
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings = 
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
> 
> 
> 
> t_env.execute_sql(source)
> t_env.execute_sql(sink)
> 
> 
> source = t_env.from_path("kafka_source_tab")\
> .select("id,alarm_id,trck_id")
> source.execute_insert("g_source_tab")



使用Flink Array Field Type

2020-07-13 文章 叶贤勋
Flink 1.10.0
问题描述:source表中有个test_array_string 
ARRAY字段,在DML语句用test_array_string[0]获取数组中的值会报数组越界异常。另外测试过Array也是相同错误,Array,Array等类型也会报数组越界问题。
请问这是Flink1.10的bug吗?


SQL:
CREATETABLE source (
……
test_array_string ARRAY
) WITH (
'connector.type'='kafka',
'update-mode'='append',
'format.type'='json'
  ……
);


CREATETABLE sink(
v_string string
) WITH (
  ……
);


INSERTINTO
sink
SELECT
test_array_string[0] as v_string
from
source;


kafka样例数据:{"id":1,"test_array_string":["ff”]}


Flink 执行的时候报以下错误:
java.lang.ArrayIndexOutOfBoundsException: 33554432
at 
org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598)
at 
org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590)
at 
org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534)
at 
org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117)
at StreamExecCalc$9.processElement(UnknownSource)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SourceConversion$1.processElement(UnknownSource)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:408)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制



Re:Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
好的,感谢答疑

















在 2020-07-13 19:49:10,"Jingsong Li"  写道:
>创建kafka_table需要在default dialect下。
>
>不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach  wrote:
>
>> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
>> 如果是default Dialect创建的表,是不是只是在临时会话有效
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 19:27:44,"Jingsong Li"  写道:
>> >Hi,
>> >
>> >问题一:
>> >
>> >只要current catalog是HiveCatalog。
>> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
>> >
>> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
>> >
>> >问题二:
>> >
>> >用filesystem创建出来的是filesystem的表,它和hive
>> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
>> >
>> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
>> >但是它的partition commit是不支持metastore的,所以不会有自动add
>> >partition到hive的默认实现,你需要自定义partition-commit-policy.
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
>> >
>> >> 尴尬
>> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
>> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
>> >> 还有两个问题问下,
>> >> 问题1:
>> >> 创建的kafka_table,在hive和Flink
>> >>
>> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 问题2:
>> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
>> >> java.util.concurrent.CompletionException:
>> >>
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> >> Could not execute application.
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >> at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >> at
>> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >> [?:1.8.0_161]
>> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >> [?:1.8.0_161]
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >>
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >>
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >> [qile-data-flow-1.0.jar:?]
>> >> Caused by:
>> >>
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> >> Could not execute application.
>> >> ... 11 more
>> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The
>> >> main method caused an error: Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table1'.
>> >>
>> >> Table options are:
>> >>
>> >> 'connector'='filesystem'
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='0s'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> at
>> >>
>> 

Re: flink-benchmarks使用求助

2020-07-13 文章 Congxian Qiu
Hi

没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢?

Best,
Congxian


zilong xiao  于2020年7月13日周一 下午2:32写道:

> 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
> timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
>
> Congxian Qiu  于2020年7月10日周五 下午7:18写道:
>
> > Hi
> > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv
> > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
> >
> > [1] https://github.com/dataArtisans/flink-benchmarks
> > [2] http://openjdk.java.net/projects/code-tools/jmh/
> >
> > Best,
> > Congxian
> >
> >
> > zilong xiao  于2020年7月10日周五 下午3:54写道:
> >
> > > 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> > >
> >
>


Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
"flink-config-k8s-session-1" not found


Leonard Xu  于2020年7月13日周一 下午8:03写道:

> Hi, zhai
>
> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月13日,19:59,Yvette zhai  写道:
> >
> > 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
> > 下载的flink-1.11.0-bin-scala_2.11.tgz
> >
> > 执行命令是
> > ./bin/kubernetes-session.sh \
> > -Dkubernetes.cluster-id=k8s-session-1 \
> > -Dtaskmanager.memory.process.size=4096m \
> > -Dkubernetes.taskmanager.cpu=2 \
> > -Dtaskmanager.numberOfTaskSlots=4 \
> > -Dresourcemanager.taskmanager-timeout=360 \
> > -Dkubernetes.container.image=flink:1.11.0-scala_2.11
> >
> > 但是会报错,找不到configmap
> >
> >
> > 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
> >
>
>


Re: Re: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-13 文章 Congxian Qiu
Hi  程龙

如果可以的话,也麻烦使用 1.11.0 测试下看问题是否还存在。

Best,
Congxian


程龙 <13162790...@163.com> 于2020年7月13日周一 上午10:45写道:

>
>
>
>
>
>
> 问题不是很常见 ,但是同一个任务,提交在flink1.10 和 flink1.10.1上都会复现, 准备尝试一下升级一下jdk试试
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-06 16:11:17,"Congxian Qiu"  写道:
> >@chenkaibit 多谢你的回复~
> >
> >Best,
> >Congxian
> >
> >
> >chenkaibit  于2020年7月6日周一 下午3:53写道:
> >
> >> hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和
> >> <高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10
> >> 引入的 MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda
> >> 表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过
> >> NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。
> >>
> >>
> >> --
> >> Best, yuchuan
> >>
> >>
> >>
> >> 在 2020-07-06 14:04:58,"Congxian Qiu"  写道:
> >> >@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
> >> >CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
> >> >
> >> >Best,
> >> >Congxian
> >> >
> >> >
> >> >陈凯  于2020年7月6日周一 上午9:53写道:
> >> >
> >> >>
> >> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> >> >> 我之前提了个jira 描述了这个问题
> >> >> https://issues.apache.org/jira/browse/FLINK-18196
> >> >>
> >> >> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk
> 版本,可以参考下面的patch:
> >> >>
> >> >>
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
> >> >>
> >> >>
> >> >>
> >> >> -邮件原件-
> >> >> 发件人: zhisheng 
> >> >> 发送时间: 2020年7月5日 15:01
> >> >> 收件人: user-zh 
> >> >> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
> >> >>
> >> >> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
> >> >>
> >> >> Best!
> >> >> zhisheng
> >> >>
> >> >> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
> >> >>
> >> >> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
> >> >> >
> >> >> > Best,
> >> >> > Congxian
> >> >> >
> >> >> >
> >> >> > zhisheng  于2020年7月4日周六 下午12:27写道:
> >> >> >
> >> >> > > 我们也有遇到过这个异常,但是不是很常见
> >> >> > >
> >> >> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> >> >> > >
> >> >> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> >> >> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> >> >> > > > Best,
> >> >> > > > Congxian
> >> >> > > >
> >> >> > > >
> >> >> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> >> >> > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> >> >> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >| |
> >> >> > > > > >JasonLee
> >> >> > > > > >|
> >> >> > > > > >|
> >> >> > > > > >邮箱:17610775...@163.com
> >> >> > > > > >|
> >> >> > > > > >
> >> >> > > > > >Signature is customized by Netease Mail Master
> >> >> > > > > >
> >> >> > > > > >在2020年07月01日 20:43,程龙 写道:
> >> >> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
> >> >> operator
> >> >> > > > > Filter -> Map (2/8).
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> >> >> >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> 

flink 1.11??????????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysqllinuxpython
 *.py


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
id VARCHAR, 
alarm_id VARCHAR, 
trck_id VARCHAR


) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g', 
'scan.startup.mode' = 'earliest-offset', 
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
id VARCHAR,  
alarm_id VARCHAR,  
trck_id VARCHAR


) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
.select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Leonard Xu
Hi, zhai

图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。

Best,
Leonard Xu

> 在 2020年7月13日,19:59,Yvette zhai  写道:
> 
> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
> 下载的flink-1.11.0-bin-scala_2.11.tgz
> 
> 执行命令是
> ./bin/kubernetes-session.sh \
> -Dkubernetes.cluster-id=k8s-session-1 \
> -Dtaskmanager.memory.process.size=4096m \
> -Dkubernetes.taskmanager.cpu=2 \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dresourcemanager.taskmanager-timeout=360 \
> -Dkubernetes.container.image=flink:1.11.0-scala_2.11
> 
> 但是会报错,找不到configmap
> 
> 
> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
> 



Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Congxian Qiu
Hi Zhefu

感谢你在邮件列表分享你的解决方法,这样其他人遇到类似问题也有一个参考。

Best,
Congxian


Zhefu PENG  于2020年7月13日周一 下午7:51写道:

> Hi all,
>
> 这封邮件最开始发出已经一个月了,这一个月里尝试了很多朋友或者各位大佬的建议,目前经过一周末加上两个工作日的查看,问题看来是解决了。
>
>
> 问题的根本原因:Kafka集群的性能不足(怀疑是CPU负荷过大)。问题出现的时候线上kakfa集群只有七台机器,在排除所有别的原因以及能进行到的尝试方案后,决定进行扩容。扩到15台机器。目前来看,平稳运行,没有再报出类似错误。
>
> 反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。
>
> Best,
> Zhefu
>
> LakeShen  于2020年6月12日周五 上午9:49写道:
>
> > Hi ZheFu,
> >
> > 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
> > 的数据是否都已经 Sink 到了 kafka.
> >
> > 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
> > 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
> >
> > 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
> >
> > Best,
> > LakeShen
> >
> > Congxian Qiu  于2020年6月11日周四 上午9:50写道:
> >
> > > Hi
> > >
> > > 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
> > > java.lang.IllegalStateException: Pending record count must be zero at
> > this
> > > point: 5”,需要看一下为什么会走到这里
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
> > >
> > > >
> > > >
> > >
> >
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
> > > >
> > > > > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> > > > >
> > > > > 补充一下,在TaskManager发现了如下错误日志:
> > > > >
> > > > > 2020-06-10 12:44:40,688 ERROR
> > > > > org.apache.flink.streaming.runtime.tasks.StreamTask   -
> Error
> > > > > during disposal of stream operator.
> > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> > Failed
> > > > to
> > > > > send data to Kafka: Pending record count must be zero at this
> point:
> > 5
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.lang.IllegalStateException: Pending record count
> must
> > > be
> > > > > zero at this point: 5
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > > > > ... 8 more
> > > > >
> > > > > 希望得到帮助,感谢!
> > > > >
> > > > >
> > > > > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> > > > >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> > > > Field_Filter
> > > > >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink:
> Unnamed
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> > > > >>
> > > > >> 部分报错信息如下:
> > > > >> 2020-06-10 12:02:49,083 INFO
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > > Triggering
> > > > >> checkpoint 1 @ 1591761769060 for job
> > c41f4811262db1c4c270b136571c8201.
> > > > >> 2020-06-10 12:04:47,898 INFO
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > Decline
> > > > >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> > > > >> c41f4811262db1c4c270b136571c8201 at
> > > > >> container_e27_1591466310139_21670_01_06 @
> > > > >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> > > > >> 2020-06-10 12:04:47,899 INFO
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > > Discarding
> > > > >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> > > > >> complete snapshot 1 for operator Source: Custom Source -> Map ->
> > > > Source_Map
> > > > >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter ->
> Map
> > > ->
> > > > 

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Leonard Xu


> 反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。
> 
> Best,
> Zhefu

谢谢 zhefu,  给你大大点赞,很社区的方式,相信这样的积累越多,小伙伴们都能学习到更多。

祝好,
Leonard Xu
 


> 
> LakeShen  于2020年6月12日周五 上午9:49写道:
> 
>> Hi ZheFu,
>> 
>> 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
>> 的数据是否都已经 Sink 到了 kafka.
>> 
>> 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
>> 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
>> 
>> 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
>> 
>> Best,
>> LakeShen
>> 
>> Congxian Qiu  于2020年6月11日周四 上午9:50写道:
>> 
>>> Hi
>>> 
>>> 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
>>> java.lang.IllegalStateException: Pending record count must be zero at
>> this
>>> point: 5”,需要看一下为什么会走到这里
>>> 
>>> Best,
>>> Congxian
>>> 
>>> 
>>> 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
>>> 
 
 
>>> 
>> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
 
> 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> 
> 补充一下,在TaskManager发现了如下错误日志:
> 
> 2020-06-10 12:44:40,688 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> during disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>> Failed
 to
> send data to Kafka: Pending record count must be zero at this point:
>> 5
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> at
> 
 
>>> 
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at
> 
 
>>> 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> at
> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> at
> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must
>>> be
> zero at this point: 5
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> ... 8 more
> 
> 希望得到帮助,感谢!
> 
> 
> Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> 
>> Hi all,
>> 
>> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
>> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
 Field_Filter
>> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
>> 
>> 
>> 
 
>>> 
>> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>> 
>> 部分报错信息如下:
>> 2020-06-10 12:02:49,083 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
 Triggering
>> checkpoint 1 @ 1591761769060 for job
>> c41f4811262db1c4c270b136571c8201.
>> 2020-06-10 12:04:47,898 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>> Decline
>> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
>> c41f4811262db1c4c270b136571c8201 at
>> container_e27_1591466310139_21670_01_06 @
>> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
>> 2020-06-10 12:04:47,899 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
 Discarding
>> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 1 for operator Source: Custom Source -> Map ->
 Source_Map
>> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map
>>> ->
 Map
>> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was
>>> declined.
>> at
>> 
 
>>> 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
>> at
>> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
>> at
>> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
>> at
>> 
 
>>> 
>> 

flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
下载的flink-1.11.0-bin-scala_2.11.tgz

执行命令是
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=k8s-session-1 \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=360 \
-Dkubernetes.container.image=flink:1.11.0-scala_2.11

但是会报错,找不到configmap
[image: image.png]

我看是执行上述命令是会生成configmap的,为什么还会报找不到。


Re: 滑动窗口数据存储多份问题

2020-07-13 文章 Congxian Qiu
Hi

从 HeapListState#add 这里看是的,我跟了一个 WindowOperator 到最终 HeapListState
的逻辑,这里确实是只有一份数据,没有拷贝。这个东西的实现可能是因为性能好,我尝试确认下这个原因,多谢你的提问。

Best,
Congxian


Jimmy Zhang <13669299...@163.com> 于2020年7月12日周日 上午8:13写道:

> Hi,all!
>
> 从WindowOperator.java的processElement方法跟进去,使用windowState.add(element.getValue());添加数据,这里面找到add方法的HeapListState类的实现,
>
>
> @Override
>  public void add(V value) {
>   Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
>   final N namespace = currentNamespace;
>   final StateTable> map = stateTable;
>   List list = map.get(namespace);
>   if (list == null) {
>list = new ArrayList<>();
>map.put(namespace, list);
>   }
>   list.add(value);
>  }
> 就是这个方法,让我产生了 “此value只真实存在一份”的困惑!
> |
> Best,
> Jimmy
> |
> 签名由网易邮箱大师定制
> 在2020年7月11日 21:02,Congxian Qiu 写道:
> Hi
> 你说的 HeapListState 的困惑具体是什么呢?
>
> Best,
> Congxian
>
>
> Jimmy Zhang <13669299...@163.com> 于2020年7月11日周六 下午4:50写道:
>
> 嗯嗯,之前没有选择回复全部,不好意思。
>
>
> 我看源码关于RocksDB这块确实是需要序列化的,所以肯定是多份保存,如果状态后端是heap呢,也是一样的吗?从我测试内存来看,感觉也是多份,只是heapliststate那个类给了我一些困惑
>
>
> 在2020年07月11日 16:23,Congxian Qiu 写道:
> Hi
>
>
> 每个窗口都是一个单独的 state,至于你认为的不同 state 仅保持引用是不对的。这个你可以使用 RocksDBStateBackend
> 来考虑,RocksDBStateBackend 中会把 state 序列化成 bytes,然后写到 RocksDB 中,就是每个 State
> 中都会有一份。
>
>
> PS:回复邮件的时候可以选择「全部回复」这样就能够加上 "user-zh@flink.apache.org"),这样我们的邮件所有人都能看到了
>
>
> Best,
> Congxian
>
>
>
>
> 张浩  于2020年7月7日周二 上午10:34写道:
>
>
>
> Hi,我通过看源码发现每条数据到达时,是分配给了所有的窗口,但是我理解这单条数据是不是只是传递给了每个窗口,其实在内存中只有一份,窗口状态保持对它的引用,触发一次窗口就删掉对这些数据的引用?
> 很高兴与您探讨!
>
>
>
>
> | |
> 张浩
> |
> |
> 邮箱:zhanghao_w...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月06日 20:56,Congxian Qiu 写道:
> Hi
>
>
> 我理解,如果只存取一份的话,state 的管理会变得麻烦一些(所有需要这份数据的窗口都需要去某个地方取, state
> 什么时候清理逻辑也会变得麻烦一些)
>
>
> Best,
> Congxian
>
>
>
>
> 张浩  于2020年7月6日周一 下午1:57写道:
>
> 你好,我的思考是便于在状态信息中清除或者提取每一个窗口的数据信息。
> 不知道,我这样理解的对吗?
> 另外,为什么我们不能只存储一份数据呢?
> 非常感谢与您交流!
>
>
>
>
> | |
> 张浩
> |
> |
> 邮箱:zhanghao_w...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月06日 13:46,Congxian Qiu 写道:
> Hi
> 现在的实现是这样的,每条数据会在每个窗口中存一份
>
> Best,
> Congxian
>
>
> 张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:
>
> Hi,all!
> 由于第一次咨询,我不确定上一份邮件大家是否收到。
> 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份?
>
>
> | |
> 张浩
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Zhefu PENG
Hi all,

这封邮件最开始发出已经一个月了,这一个月里尝试了很多朋友或者各位大佬的建议,目前经过一周末加上两个工作日的查看,问题看来是解决了。

问题的根本原因:Kafka集群的性能不足(怀疑是CPU负荷过大)。问题出现的时候线上kakfa集群只有七台机器,在排除所有别的原因以及能进行到的尝试方案后,决定进行扩容。扩到15台机器。目前来看,平稳运行,没有再报出类似错误。

反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。

Best,
Zhefu

LakeShen  于2020年6月12日周五 上午9:49写道:

> Hi ZheFu,
>
> 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
> 的数据是否都已经 Sink 到了 kafka.
>
> 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
> 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
>
> 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
>
> Best,
> LakeShen
>
> Congxian Qiu  于2020年6月11日周四 上午9:50写道:
>
> > Hi
> >
> > 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
> > java.lang.IllegalStateException: Pending record count must be zero at
> this
> > point: 5”,需要看一下为什么会走到这里
> >
> > Best,
> > Congxian
> >
> >
> > 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
> >
> > >
> > >
> >
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
> > >
> > > > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> > > >
> > > > 补充一下,在TaskManager发现了如下错误日志:
> > > >
> > > > 2020-06-10 12:44:40,688 ERROR
> > > > org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> > > > during disposal of stream operator.
> > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed
> > > to
> > > > send data to Kafka: Pending record count must be zero at this point:
> 5
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.IllegalStateException: Pending record count must
> > be
> > > > zero at this point: 5
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > > > ... 8 more
> > > >
> > > > 希望得到帮助,感谢!
> > > >
> > > >
> > > > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> > > >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> > > Field_Filter
> > > >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
> > > >>
> > > >>
> > > >>
> > >
> >
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> > > >>
> > > >> 部分报错信息如下:
> > > >> 2020-06-10 12:02:49,083 INFO
> > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > Triggering
> > > >> checkpoint 1 @ 1591761769060 for job
> c41f4811262db1c4c270b136571c8201.
> > > >> 2020-06-10 12:04:47,898 INFO
> > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > Decline
> > > >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> > > >> c41f4811262db1c4c270b136571c8201 at
> > > >> container_e27_1591466310139_21670_01_06 @
> > > >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> > > >> 2020-06-10 12:04:47,899 INFO
> > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > Discarding
> > > >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> > > >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> > > >> complete snapshot 1 for operator Source: Custom Source -> Map ->
> > > Source_Map
> > > >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map
> > ->
> > > Map
> > > >> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was
> > declined.
> > > >> at
> > > >>
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> > > >> at
> > > >>
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> > > >> at
> > > >>
> > >
> >
> 

Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
创建kafka_table需要在default dialect下。

不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)

Best,
Jingsong

On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach  wrote:

> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
> 如果是default Dialect创建的表,是不是只是在临时会话有效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:27:44,"Jingsong Li"  写道:
> >Hi,
> >
> >问题一:
> >
> >只要current catalog是HiveCatalog。
> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
> >
> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
> >
> >问题二:
> >
> >用filesystem创建出来的是filesystem的表,它和hive
> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
> >
> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
> >但是它的partition commit是不支持metastore的,所以不会有自动add
> >partition到hive的默认实现,你需要自定义partition-commit-policy.
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
> >
> >> 尴尬
> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> >> 还有两个问题问下,
> >> 问题1:
> >> 创建的kafka_table,在hive和Flink
> >>
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
> >>
> >>
> >>
> >>
> >>
> >>
> >> 问题2:
> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
> >> java.util.concurrent.CompletionException:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >> at
> >>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >> ~[?:1.8.0_161]
> >> at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> [?:1.8.0_161]
> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> [?:1.8.0_161]
> >> at
> >>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> >> [qile-data-flow-1.0.jar:?]
> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> [qile-data-flow-1.0.jar:?]
> >> Caused by:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >> ... 11 more
> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The
> >> main method caused an error: Unable to create a sink for writing table
> >> 'default_catalog.default_database.hive_table1'.
> >>
> >> Table options are:
> >>
> >> 'connector'='filesystem'
> >> 'hive.storage.file-format'='parquet'
> >> 'is_generic'='false'
> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> 'sink.partition-commit.delay'='0s'
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]

Re:Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
如果是default Dialect创建的表,是不是只是在临时会话有效

















在 2020-07-13 19:27:44,"Jingsong Li"  写道:
>Hi,
>
>问题一:
>
>只要current catalog是HiveCatalog。
>理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
>
>明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
>
>问题二:
>
>用filesystem创建出来的是filesystem的表,它和hive
>metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
>
>filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
>但是它的partition commit是不支持metastore的,所以不会有自动add
>partition到hive的默认实现,你需要自定义partition-commit-policy.
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
>
>> 尴尬
>> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
>> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
>> 还有两个问题问下,
>> 问题1:
>> 创建的kafka_table,在hive和Flink
>> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>>
>>
>>
>>
>>
>>
>> 问题2:
>> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> ~[?:1.8.0_161]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> [?:1.8.0_161]
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> [?:1.8.0_161]
>> at
>> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>> [qile-data-flow-1.0.jar:?]
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [qile-data-flow-1.0.jar:?]
>> Caused by:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>> ... 11 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error: Unable to create a sink for writing table
>> 'default_catalog.default_database.hive_table1'.
>>
>> Table options are:
>>
>> 'connector'='filesystem'
>> 'hive.storage.file-format'='parquet'
>> 'is_generic'='false'
>> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> 'sink.partition-commit.delay'='0s'
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> ... 10 more
>> Caused by: org.apache.flink.table.api.ValidationException: Unable to
>> create a sink for 

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 Jark Wu
你可以在 mysqlSinkFunction 中攒 buffer,在 timer trigger 或者 checkpoint 时 flush
mysql database,以及 output。

On Mon, 13 Jul 2020 at 15:36, jindy_liu <286729...@qq.com> wrote:

>
>
> 如果可以chain在一起,这个可以保证顺序性,我去试试。
>
> 这里再追问下,实际中,如里单流源里的数据也要顺序处理,可以设置并行度为1;
>
> 这里可能也要考虑下mysqlSinkFunction里的sink效率问题,需要积累一些数据再sink,这里可以做到跟kafka
> sink的batch_size和linger.ms配合联动吗?比如满1000条,或超过1s,就同时sink?
>
> 谢谢~
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
Hi,

问题一:

只要current catalog是HiveCatalog。
理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.

明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?

问题二:

用filesystem创建出来的是filesystem的表,它和hive
metastore是没有关系的,你需要使用创建filesystem表的语法[1]。

filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
但是它的partition commit是不支持metastore的,所以不会有自动add
partition到hive的默认实现,你需要自定义partition-commit-policy.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Best,
Jingsong

On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:

> 尴尬
> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> 还有两个问题问下,
> 问题1:
> 创建的kafka_table,在hive和Flink
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>
>
>
>
>
>
> 问题2:
> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> ~[?:1.8.0_161]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_161]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_161]
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> [qile-data-flow-1.0.jar:?]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [qile-data-flow-1.0.jar:?]
> Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error: Unable to create a sink for writing table
> 'default_catalog.default_database.hive_table1'.
>
> Table options are:
>
> 'connector'='filesystem'
> 'hive.storage.file-format'='parquet'
> 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> 'sink.partition-commit.delay'='0s'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> ... 10 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a sink for writing table
> 'default_catalog.default_database.hive_table1'.
>
> Table options are:
>
> 'connector'='filesystem'
> 'hive.storage.file-format'='parquet'
> 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> 'sink.partition-commit.delay'='0s'
> 

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
能贴下完整的异常栈么?

Btw,TableEnvironment上的 connect API 目前不建议使用,有许多已知的问题和缺失的 feature,建议用
executeSql(ddl) 来替代。
社区计划在 1.12 中系统地重构和修复 connect API 。

Best,
Jark

On Mon, 13 Jul 2020 at 17:24, Hito Zhu  wrote:

> 使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定
> createTemporaryTable
> 为事件时间,程序包 Field null does not exist 错误,是我用法有问题?
> 看了下  https://issues.apache.org/jira/browse/FLINK-16160
>    这个 issue 是解决的这个问题吗?
>
> tableEnv.connect(kafka)
> .withSchema(
>   new Schema().field("searchTime",
> DataTypes.TIMESTAMP()).rowtime(rowtime);
> )
> .withFormat(
> new Json().failOnMissingField(false)
> )
> .createTemporaryTable("tablename");
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11写入mysql问题

2020-07-13 文章 Jark Wu
请问你是怎么提交的作业呢? 是在本地 IDEA 里面执行的,还是打成 jar 包后提交到集群运行的呢?

On Mon, 13 Jul 2020 at 17:58, 小学霸  wrote:

> 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> source="""
> CREATE TABLE kafka_source_tab (
> id VARCHAR, 
> alarm_id VARCHAR, 
> trck_id VARCHAR
>
>
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alarm_test_g', 
> 'scan.startup.mode' = 'earliest-offset',
> 'properties.bootstrap.servers' = '10.2.2.73:2181',
> 'properties.bootstrap.servers' = '10.2.2.73:9092',
> 'format' = 'json'
> )
> """
>
> sink="""
> CREATE TABLE g_source_tab (
> id VARCHAR, 
> alarm_id VARCHAR,  
> trck_id VARCHAR
>
>
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
> 'table-name' = 'g', 
> 'username' = 'root',
> 'password' = '123456t',
> 'sink.buffer-flush.interval' = '1s'
> )
> """
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env,
> environment_settings=env_settings)
>
>
>
> t_env.execute_sql(source)
> t_env.execute_sql(sink)
>
>
> source = t_env.from_path("kafka_source_tab")\
> .select("id,alarm_id,trck_id")
> source.execute_insert("g_source_tab")


Re:Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
尴尬
我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅 
这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
还有两个问题问下,
问题1:
创建的kafka_table,在hive和Flink 
SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore






问题2:
刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_161]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_161]
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
 [qile-data-flow-1.0.jar:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [qile-data-flow-1.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[qile-data-flow-1.0.jar:?]
Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Unable to create a sink for writing table 
'default_catalog.default_database.hive_table1'.

Table options are:

'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a 
sink for writing table 'default_catalog.default_database.hive_table1'.

Table options are:

'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 

回复: Flink es7 connector认证问题

2020-07-13 文章 李宇彬
感谢,已找到问题原因,这个provider变量应该放到setHttpClientConfigCallback内部,之前是作为私有成员变量transient声明的,会导致taskmanager无法拿到认证信息
String user = pt.get("es.user.name");
String password = pt.get("es.user.password");
esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setHttpClientConfigCallback(httpClientBuilder 
->
{
CredentialsProvider provider = new 
BasicCredentialsProvider();
provider.setCredentials(AuthScope.ANY,
new 
UsernamePasswordCredentials(user, password));
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);
在2020年7月13日 15:33,Yangze Guo 写道:
Hi,

请问您有检查过pt.get("es.user.name"),
pt.get("es.user.password")这两个参数读出来是否都是正确的,另外更完整的错误栈方便提供下么?

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 3:10 PM 李宇彬  wrote:

各位好,
请教一个问题
我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink 
里就报错了
org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/], 
status line [HTTP/1.1 401 Unauthorized]
ParameterTool pt = ParameterTool.fromArgs(args);
String confFile = pt.get("confFile");
pt = ParameterTool.fromPropertiesFile(new File(confFile));
provider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(pt.get("es.user.name"), 
pt.get("es.user.password")));

esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setSocketTimeout(18)
.setConnectionRequestTimeout(1)
)
.setHttpClientConfigCallback(httpClientBuilder ->
{
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);


Re: flink 1.11 es未定义pk的sink问题

2020-07-13 文章 Leonard Xu
HI,fulin

如 Yangze所说,这是es6 new connector 引入的一个bug,  你可以使用用old 
connector的语法绕过,就是connector.type=’xx’ ,这样代码路径还走之前的代码, 或者使用es7 nconnector。

祝好,
Leonard Xu

> 在 2020年7月13日,17:19,Yangze Guo  写道:
> 
> 验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。
> 
> [1] 
> https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285
> 
> Best,
> Yangze Guo
> 
> On Mon, Jul 13, 2020 at 3:44 PM sunfulin  wrote:
>> 
>> hi,YangZe,Leonard,
>> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。
>> 
>> import org.apache.flink.api.common.typeinfo.Types;
>> 
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> 
>> import 
>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>> 
>> import org.apache.flink.table.api.EnvironmentSettings;
>> 
>> import org.apache.flink.table.api.StatementSet;
>> 
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> 
>> import org.apache.flink.types.Row;
>> 
>> 
>> import static org.apache.flink.table.api.Expressions.$;
>> 
>> 
>> public class ESNewJobTest {
>> 
>> 
>>//构建StreamExecutionEnvironment
>> 
>>public static final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>>//构建EnvironmentSettings 并指定Blink Planner
>> 
>>private static final EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> 
>> 
>>//构建StreamTableEnvironment
>> 
>>public static final StreamTableEnvironment tEnv = 
>> StreamTableEnvironment.create(env, bsSettings);
>> 
>> 
>>//DDL语句
>> 
>>public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE 
>> es_sink_test_no_pk (\n" +
>> 
>>"  idx integer,\n" +
>> 
>>"  firstx varchar\n" +
>> 
>>") WITH (\n" +
>> 
>>"'connector' = 'elasticsearch-6',\n" +
>> 
>>"'hosts' = '168.61.113.171:9200',\n" +
>> 
>>"'index' = 'es_sink_test_no_pk',\n" +
>> 
>>"'document-type' = 'default',\n" +
>> 
>>"'document-id.key-delimiter' = '$',\n" +
>> 
>>"'sink.bulk-flush.interval' = '1000',\n" +
>> 
>>"'failure-handler' = 'fail',\n" +
>> 
>>"'format' = 'json'\n" +
>> 
>>")";
>> 
>>public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE 
>> es_sink_test_with_pk (\n" +
>> 
>>"  idx integer,\n" +
>> 
>>"  firstx varchar,\n" +
>> 
>>"  primary key (idx, firstx) not enforced\n" +
>> 
>>") WITH (\n" +
>> 
>>"'connector' = 'elasticsearch-6',\n" +
>> 
>>"'hosts' = '168.61.113.171:9200',\n" +
>> 
>>"'index' = 'es_sink_test_with_pk',\n" +
>> 
>>"'document-type' = 'default',\n" +
>> 
>>"'document-id.key-delimiter' = '$',\n" +
>> 
>>"'sink.bulk-flush.interval' = '1000',\n" +
>> 
>>"'failure-handler' = 'fail',\n" +
>> 
>>"'format' = 'json'\n" +
>> 
>>")";
>> 
>> 
>>public static String getCharAndNumr(int length) {
>> 
>>StringBuffer valSb = new StringBuffer();
>> 
>>for (int i = 0; i < length; i++) {
>> 
>>String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : 
>> "num"; // 输出字母还是数字
>> 
>>if ("char".equalsIgnoreCase(charOrNum)) {
>> 
>>// 字符串
>> 
>>int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  
>> // 取得大写字母还是小写字母
>> 
>>valSb.append((char) (choice + Math.round(Math.random()*25)));
>> 
>>} else if ("num".equalsIgnoreCase(charOrNum)) {
>> 
>>// 数字
>> 
>>valSb.append(String.valueOf(Math.round(Math.random()*9)));
>> 
>>}
>> 
>>}
>> 
>>return valSb.toString();
>> 
>> 
>>}
>> 
>> 
>>public static void main(String[] args) throws Exception {
>> 
>> 
>>DataStream ds = env.addSource(new 
>> RichParallelSourceFunction() {
>> 
>> 
>>volatile boolean flag = true;
>> 
>> 
>>@Override
>> 
>>public void run(SourceContext ctx) throws Exception {
>> 
>>while (flag) {
>> 
>>Row row = new Row(2);
>> 
>>row.setField(0, 2207);
>> 
>>row.setField(1, getCharAndNumr(4));
>> 
>>ctx.collect(row);
>> 
>>Thread.sleep(1000);
>> 
>>}
>> 
>> 
>>}
>> 
>> 
>>@Override
>> 
>>public void cancel() {
>> 
>>flag = false;
>> 
>>}
>> 
>>}).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));
>> 
>> 
>> 
>>//ES sink测试ddl
>> 

flink 1.11????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysql??
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
id VARCHAR, 
alarm_id VARCHAR, 
trck_id VARCHAR


) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g', 
'scan.startup.mode' = 'earliest-offset', 
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
id VARCHAR,  
alarm_id VARCHAR,  
trck_id VARCHAR


) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
.select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

回复:退订

2020-07-13 文章 苑士旸
谢谢,已经找到


| |
yuanshiyang
|
|
邮箱yuanshiy...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月13日 17:55,Jingsong Li 写道:
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Mon, Jul 13, 2020 at 5:53 PM 苑士旸  wrote:

>
>
>
> | |
> yuanshiyang
> |
> |
> 邮箱yuanshiy...@163.com
> |
>
> 签名由 网易邮箱大师 定制



--
Best, Jingsong Lee


Re: pyflink问题求助

2020-07-13 文章 Xingbo Huang
Hi hieule,
This work around method is used in flink 1.10, in flink 1.11 you can use
ddl directly (blink planner) which you can refer to [1].
For how to use blink planner in PyFlink, you can refer to following code:

t_env = BatchTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance()
.in_batch_mode().use_blink_planner().build())

t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

Best,
Xingbo

hieule  于2020年7月13日周一 下午4:46写道:

>  hello Xingbo Huang,
>
> when I run , I had some error
>
> `TypeError: Could not found the Java class
> 'org.apache.flink.api.java.io.jdbc.JDBCAppendTableSinkBuilder'. The Java
> dependencies could be specified via command line argument '--jarfile' or
> the
> config option 'pipeline.jars' `
>
>
> how to solve issue ?
>
> Thank
> hieule
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 退订

2020-07-13 文章 Jingsong Li
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Mon, Jul 13, 2020 at 5:53 PM 苑士旸  wrote:

>
>
>
> | |
> yuanshiyang
> |
> |
> 邮箱yuanshiy...@163.com
> |
>
> 签名由 网易邮箱大师 定制



-- 
Best, Jingsong Lee


Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
你把完整的程序再贴下呢

Best,
Jingsong

On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach  wrote:

> Hi,
>
>
> 我现在改成了:
> 'sink.partition-commit.delay'='0s'
>
>
> checkpoint完成了20多次,hdfs文件也产生了20多个,
> hive表还是查不到数据
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 17:23:34,"夏帅"  写道:
>
> 你好,
> 你设置了1个小时的
> SINK_PARTITION_COMMIT_DELAY
>
>
> --
> 发件人:Zhou Zach 
> 发送时间:2020年7月13日(星期一) 17:09
> 收件人:user-zh 
> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for
> discovering a connector.
>
>
> 开了checkpoint,
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.enableCheckpointing(5 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>
>
>
>
> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 16:52:16,"Jingsong Li"  写道:
> >有开checkpoint吧?delay设的多少?
> >
> >Add partition 在 checkpoint完成 + delay的时间后
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
> >
> >> Hi,
> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> partition到hive表吗,我当前设置了参数
> >> 'sink.partition-commit.policy.kind'='metastore'
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
> >> >Hi,
> >> >
> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >
> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
> >> >
> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> 也报错误
> >> >> query:
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |
> >> >> |CREATE TABLE hive_table (
> >> >> |  user_id STRING,
> >> >> |  age INT
> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> TBLPROPERTIES (
> >> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >> >> |  'sink.partition-commit.trigger'='partition-time',
> >> >> |  'sink.partition-commit.delay'='1 h',
> >> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> |)
> >> >> |
> >> >> |""".stripMargin)
> >> >>
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |CREATE TABLE kafka_table (
> >> >> |uid VARCHAR,
> >> >> |-- uid BIGINT,
> >> >> |sex VARCHAR,
> >> >> |age INT,
> >> >> |created_time TIMESTAMP(3),
> >> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >> |) WITH (
> >> >> |'connector.type' = 'kafka',
> >> >> |'connector.version' = 'universal',
> >> >> | 'connector.topic' = 'user',
> >> >> |-- 'connector.topic' = 'user_long',
> >> >> |'connector.startup-mode' = 'latest-offset',
> >> >> |'connector.properties.zookeeper.connect' =
> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> |'connector.properties.bootstrap.servers' =
> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> |'connector.properties.group.id' = 'user_flink',
> >> >> |'format.type' = 'json',
> >> >> |'format.derive-schema' = 'true'
> >> >> |)
> >> >> |""".stripMargin)
> >> >>
> >> >>
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |INSERT INTO hive_table
> >> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
> >> >> DATE_FORMAT(created_time, 'HH')
> >> >> |FROM kafka_table
> >> >> |
> >> >> |""".stripMargin)
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >> |
> >> >> |""".stripMargin)
> >> >> .print()
> >> >> 错误栈:
> >> >> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException:
> >> >> Unable to create a sink for writing table
> >> >> 'default_catalog.default_database.hive_table'.
> >> >>
> >> >> Table options are:
> >> >>
> >> >> 'hive.storage.file-format'='parquet'
> >> >> 'is_generic'='false'
> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >> at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> at
> >> >>
> >>
> 

退订

2020-07-13 文章 苑士旸



| |
yuanshiyang
|
|
邮箱yuanshiy...@163.com
|

签名由 网易邮箱大师 定制

flink 1.11????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysql??
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
id VARCHAR, 
alarm_id VARCHAR, 
trck_id VARCHAR


) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g', 
'scan.startup.mode' = 'earliest-offset', 
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
id VARCHAR,  
alarm_id VARCHAR,  
trck_id VARCHAR


) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
.select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

Re:回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
Hi,


我现在改成了:
'sink.partition-commit.delay'='0s'


checkpoint完成了20多次,hdfs文件也产生了20多个,
hive表还是查不到数据













在 2020-07-13 17:23:34,"夏帅"  写道:

你好,
你设置了1个小时的
SINK_PARTITION_COMMIT_DELAY


--
发件人:Zhou Zach 
发送时间:2020年7月13日(星期一) 17:09
收件人:user-zh 
主 题:Re:Re: Re: Table options do not contain an option key 'connector' for 
discovering a connector.


开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li"  写道:
>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |
>> >> |CREATE TABLE hive_table (
>> >> |  user_id STRING,
>> >> |  age INT
>> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >> |  'sink.partition-commit.trigger'='partition-time',
>> >> |  'sink.partition-commit.delay'='1 h',
>> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> |)
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |CREATE TABLE kafka_table (
>> >> |uid VARCHAR,
>> >> |-- uid BIGINT,
>> >> |sex VARCHAR,
>> >> |age INT,
>> >> |created_time TIMESTAMP(3),
>> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >> |) WITH (
>> >> |'connector.type' = 'kafka',
>> >> |'connector.version' = 'universal',
>> >> | 'connector.topic' = 'user',
>> >> |-- 'connector.topic' = 'user_long',
>> >> |'connector.startup-mode' = 'latest-offset',
>> >> |'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> |'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> |'connector.properties.group.id' = 'user_flink',
>> >> |'format.type' = 'json',
>> >> |'format.derive-schema' = 'true'
>> >> |)
>> >> |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |INSERT INTO hive_table
>> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >> |FROM kafka_table
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> |
>> >> |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >> at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> 

flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable
为事件时间,程序包 Field null does not exist 错误,是我用法有问题?
看了下  https://issues.apache.org/jira/browse/FLINK-16160
   这个 issue 是解决的这个问题吗?

tableEnv.connect(kafka)
.withSchema(
  new Schema().field("searchTime",
DataTypes.TIMESTAMP()).rowtime(rowtime);
)
.withFormat(
new Json().failOnMissingField(false)
)
.createTemporaryTable("tablename");



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 夏帅
你好,
你设置了1个小时的
SINK_PARTITION_COMMIT_DELAY


--
发件人:Zhou Zach 
发送时间:2020年7月13日(星期一) 17:09
收件人:user-zh 
主 题:Re:Re: Re: Table options do not contain an option key 'connector' for 
discovering a connector.

开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li"  写道:
>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |
>> >> |CREATE TABLE hive_table (
>> >> |  user_id STRING,
>> >> |  age INT
>> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >> |  'sink.partition-commit.trigger'='partition-time',
>> >> |  'sink.partition-commit.delay'='1 h',
>> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> |)
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |CREATE TABLE kafka_table (
>> >> |uid VARCHAR,
>> >> |-- uid BIGINT,
>> >> |sex VARCHAR,
>> >> |age INT,
>> >> |created_time TIMESTAMP(3),
>> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >> |) WITH (
>> >> |'connector.type' = 'kafka',
>> >> |'connector.version' = 'universal',
>> >> | 'connector.topic' = 'user',
>> >> |-- 'connector.topic' = 'user_long',
>> >> |'connector.startup-mode' = 'latest-offset',
>> >> |'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> |'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> |'connector.properties.group.id' = 'user_flink',
>> >> |'format.type' = 'json',
>> >> |'format.derive-schema' = 'true'
>> >> |)
>> >> |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |INSERT INTO hive_table
>> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >> |FROM kafka_table
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> |
>> >> |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >> at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> at
>> >>
>> 

flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable
为事件时间,程序包 Field null does not exist 错误,是我用法有问题?
看了下  https://issues.apache.org/jira/browse/FLINK-16160
   这个 issue 是解决的这个问题吗?

tableEnv.connect(kafka)
.withSchema(
  new Schema().field("searchTime",
DataTypes.TIMESTAMP()).rowtime(rowtime);
)
.withFormat(
new Json().failOnMissingField(false)
)
.createTemporaryTable("tablename");



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
有没有设置 sink.partition-commit.delay?

Best,
Jingsong

On Mon, Jul 13, 2020 at 5:09 PM Zhou Zach  wrote:

> 开了checkpoint,
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.enableCheckpointing(5 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>
>
>
>
> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 16:52:16,"Jingsong Li"  写道:
> >有开checkpoint吧?delay设的多少?
> >
> >Add partition 在 checkpoint完成 + delay的时间后
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
> >
> >> Hi,
> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> partition到hive表吗,我当前设置了参数
> >> 'sink.partition-commit.policy.kind'='metastore'
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
> >> >Hi,
> >> >
> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >
> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
> >> >
> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> 也报错误
> >> >> query:
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |
> >> >> |CREATE TABLE hive_table (
> >> >> |  user_id STRING,
> >> >> |  age INT
> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> TBLPROPERTIES (
> >> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >> >> |  'sink.partition-commit.trigger'='partition-time',
> >> >> |  'sink.partition-commit.delay'='1 h',
> >> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> |)
> >> >> |
> >> >> |""".stripMargin)
> >> >>
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |CREATE TABLE kafka_table (
> >> >> |uid VARCHAR,
> >> >> |-- uid BIGINT,
> >> >> |sex VARCHAR,
> >> >> |age INT,
> >> >> |created_time TIMESTAMP(3),
> >> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >> |) WITH (
> >> >> |'connector.type' = 'kafka',
> >> >> |'connector.version' = 'universal',
> >> >> | 'connector.topic' = 'user',
> >> >> |-- 'connector.topic' = 'user_long',
> >> >> |'connector.startup-mode' = 'latest-offset',
> >> >> |'connector.properties.zookeeper.connect' =
> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> |'connector.properties.bootstrap.servers' =
> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> |'connector.properties.group.id' = 'user_flink',
> >> >> |'format.type' = 'json',
> >> >> |'format.derive-schema' = 'true'
> >> >> |)
> >> >> |""".stripMargin)
> >> >>
> >> >>
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |INSERT INTO hive_table
> >> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
> >> >> DATE_FORMAT(created_time, 'HH')
> >> >> |FROM kafka_table
> >> >> |
> >> >> |""".stripMargin)
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >> |
> >> >> |""".stripMargin)
> >> >> .print()
> >> >> 错误栈:
> >> >> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException:
> >> >> Unable to create a sink for writing table
> >> >> 'default_catalog.default_database.hive_table'.
> >> >>
> >> >> Table options are:
> >> >>
> >> >> 'hive.storage.file-format'='parquet'
> >> >> 'is_generic'='false'
> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >> at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >>

Re: Re: Re: flink 1.11 es未定义pk的sink问题

2020-07-13 文章 Yangze Guo
验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。

[1] 
https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 3:44 PM sunfulin  wrote:
>
> hi,YangZe,Leonard,
> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。
>
> import org.apache.flink.api.common.typeinfo.Types;
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>
> import 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>
> import org.apache.flink.table.api.EnvironmentSettings;
>
> import org.apache.flink.table.api.StatementSet;
>
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
> import org.apache.flink.types.Row;
>
>
> import static org.apache.flink.table.api.Expressions.$;
>
>
> public class ESNewJobTest {
>
>
> //构建StreamExecutionEnvironment
>
> public static final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
> //构建EnvironmentSettings 并指定Blink Planner
>
> private static final EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>
>
> //构建StreamTableEnvironment
>
> public static final StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(env, bsSettings);
>
>
> //DDL语句
>
> public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE 
> es_sink_test_no_pk (\n" +
>
> "  idx integer,\n" +
>
> "  firstx varchar\n" +
>
> ") WITH (\n" +
>
> "'connector' = 'elasticsearch-6',\n" +
>
> "'hosts' = '168.61.113.171:9200',\n" +
>
> "'index' = 'es_sink_test_no_pk',\n" +
>
> "'document-type' = 'default',\n" +
>
> "'document-id.key-delimiter' = '$',\n" +
>
> "'sink.bulk-flush.interval' = '1000',\n" +
>
> "'failure-handler' = 'fail',\n" +
>
> "'format' = 'json'\n" +
>
> ")";
>
> public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE 
> es_sink_test_with_pk (\n" +
>
> "  idx integer,\n" +
>
> "  firstx varchar,\n" +
>
> "  primary key (idx, firstx) not enforced\n" +
>
> ") WITH (\n" +
>
> "'connector' = 'elasticsearch-6',\n" +
>
> "'hosts' = '168.61.113.171:9200',\n" +
>
> "'index' = 'es_sink_test_with_pk',\n" +
>
> "'document-type' = 'default',\n" +
>
> "'document-id.key-delimiter' = '$',\n" +
>
> "'sink.bulk-flush.interval' = '1000',\n" +
>
> "'failure-handler' = 'fail',\n" +
>
> "'format' = 'json'\n" +
>
> ")";
>
>
> public static String getCharAndNumr(int length) {
>
> StringBuffer valSb = new StringBuffer();
>
> for (int i = 0; i < length; i++) {
>
> String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : 
> "num"; // 输出字母还是数字
>
> if ("char".equalsIgnoreCase(charOrNum)) {
>
> // 字符串
>
> int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  
> // 取得大写字母还是小写字母
>
> valSb.append((char) (choice + Math.round(Math.random()*25)));
>
> } else if ("num".equalsIgnoreCase(charOrNum)) {
>
> // 数字
>
> valSb.append(String.valueOf(Math.round(Math.random()*9)));
>
> }
>
> }
>
> return valSb.toString();
>
>
> }
>
>
> public static void main(String[] args) throws Exception {
>
>
> DataStream ds = env.addSource(new 
> RichParallelSourceFunction() {
>
>
> volatile boolean flag = true;
>
>
> @Override
>
> public void run(SourceContext ctx) throws Exception {
>
> while (flag) {
>
> Row row = new Row(2);
>
> row.setField(0, 2207);
>
> row.setField(1, getCharAndNumr(4));
>
> ctx.collect(row);
>
> Thread.sleep(1000);
>
> }
>
>
> }
>
>
> @Override
>
> public void cancel() {
>
> flag = false;
>
> }
>
> }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));
>
>
>
> //ES sink测试ddl
>
> tEnv.executeSql(ES_SINK_DDL_NO_PK);
>
> tEnv.executeSql(ES_SINK_DDL_WITH_PK);
>
>
> //source注册成表
>
> tEnv.createTemporaryView("test", ds, $("f0").as("idx"), 
> $("f1").as("firstx"), $("p").proctime());
>
>
> //sink写入
>
> StatementSet ss = tEnv.createStatementSet();
>
> ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx 
> from test");
>
> 

pyflink connect mysql

2020-07-13 文章 hieule
Hi , 
I has problem when submit job 
```
java.lang.AbstractMethodError: Method
org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.consumeDataSet(Lorg/apache/flink/api/java/DataSet;)Lorg/apache/flink/api/java/operators/DataSink;
is abstract
```


My code :

```
import logging
import os
import shutil
import sys
import tempfile

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig,TableSink
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from pyflink.table.types import DataTypes
from pyflink.java_gateway import get_gateway
from pyflink.table.types import _to_java_type
from pyflink.util import utils

class JDBCAppendSink(TableSink):

def __init__(self, field_names: list, field_types: list, driver_name:
str, db_url: str,username: str, password: str, query: str):
gateway = get_gateway()
j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(field_type) for
field_type in field_types])
builder =
gateway.jvm.org.apache.flink.api.java.io.jdbc.JDBCAppendTableSinkBuilder()
builder.setUsername(username)
builder.setPassword(password)
builder.setDrivername(driver_name)
builder.setDBUrl(db_url)
builder.setParameterTypes(j_field_types)
builder.setQuery(query)
j_jdbc_sink = builder.build()
j_jdbc_sink = j_jdbc_sink.configure(j_field_names, j_field_types)
super(JDBCAppendSink, self).__init__(j_jdbc_sink)

def word_count():
t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = BatchTableEnvironment.create(env, t_config)

# register Results table in table environment
tmp_dir = tempfile.gettempdir()
result_path = tmp_dir + '/result'
source_path = "/home/hieulm/code/data/table_orders.csv"
if os.path.exists(result_path):
try:
if os.path.isfile(result_path):
os.remove(result_path)
else:
shutil.rmtree(result_path)
except OSError as e:
logging.error("Error removing directory: %s - %s.", e.filename,
e.strerror)

logging.info("Read file source CSV: %s", source_path)
t_env.connect(FileSystem().path(source_path)) \
.with_format(OldCsv()
.field_delimiter(',')
.field("a", DataTypes.STRING())
.field("b", DataTypes.BIGINT())
.field("c", DataTypes.BIGINT()) 
.field("rowtime", DataTypes.STRING())) \
.with_schema(Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.BIGINT())
.field("c", DataTypes.BIGINT()) 
.field("rowtime", DataTypes.STRING())) \
.create_temporary_table("orders")
logging.info("Results directory: DB bank_age")
t_env.register_table_sink("bank_age",
JDBCAppendSink(
["age", "count_age"],
[
DataTypes.BIGINT(),
DataTypes.BIGINT()],
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://localhost/flink",
"hieulm",
"Csda@123",
"insert into bank_age (age, count_age) values (?, ?)"
))

t_env.scan("orders") \
.group_by("b")\
.select("b as age, count(b) as count_age ") \
.insert_into("bank_age")

t_env.execute("bank_count_age")


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")

word_count()
```

How to solve problem ?


thank 
hieule




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li"  写道:
>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |
>> >> |CREATE TABLE hive_table (
>> >> |  user_id STRING,
>> >> |  age INT
>> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >> |  'sink.partition-commit.trigger'='partition-time',
>> >> |  'sink.partition-commit.delay'='1 h',
>> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> |)
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |CREATE TABLE kafka_table (
>> >> |uid VARCHAR,
>> >> |-- uid BIGINT,
>> >> |sex VARCHAR,
>> >> |age INT,
>> >> |created_time TIMESTAMP(3),
>> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >> |) WITH (
>> >> |'connector.type' = 'kafka',
>> >> |'connector.version' = 'universal',
>> >> | 'connector.topic' = 'user',
>> >> |-- 'connector.topic' = 'user_long',
>> >> |'connector.startup-mode' = 'latest-offset',
>> >> |'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> |'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> |'connector.properties.group.id' = 'user_flink',
>> >> |'format.type' = 'json',
>> >> |'format.derive-schema' = 'true'
>> >> |)
>> >> |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |INSERT INTO hive_table
>> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >> |FROM kafka_table
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> |
>> >> |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >> at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> at
>> >> 

Re: Re: flink on yarn日志问题

2020-07-13 文章 Yangze Guo
1. 
我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html

Best,
Yangze Guo


On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
>
> 不好意思  怪我灭有描述清楚
> 1 目前开启日志收集功能
> 2 目前已是 per-job模式
> 3 集群使用cdh flink.1.10
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> >Hi,
> >
> >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> >
> >第二个问题,您可以尝试一下per-job mode [2][3]
> >
> >[1] 
> >https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> >[2] 
> >https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> >[3] 
> >https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> >
> >
> >Best,
> >Yangze Guo
> >
> >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> >>
> >> 请问一下两个问题
> >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看 ,除了使用es收集日志的这种方案, 
> >> 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在, 有没有好的方式或者策略 ,  
> >>  可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> >>


  1   2   >