Re: Missing image tag in apache/flink repository ?

2022-11-15 Thread godfrey he
Thanks for reporting this, I will resolve it ASAP.

Best,
Godfrey

Alon Halimi via user  于2022年11月15日周二 16:46写道:
>
> Hello :)
>
>
>
> It seems the tag “apache/flink:1.16.0-scala_2.12” is missing – I get the 
> following error:
>
>
>
> failed to pull and unpack image "docker.io/apache/flink:1.16.0-scala_2.12"
>
>
>
>
>
> note that:
>
> (1) /apache/flink:1.16.0-scala_2.12 (without the 0 version suffix ) does exist
>
> (2) /flink:1.16.0-scala_2.12 (without apache prefix )does exist
>
>
>
> Thanks in advance
>
>
>
> Alon Halimi
>
> This message is intended only for the designated recipient(s). It may contain 
> confidential or proprietary information. If you are not the designated 
> recipient, you may not review, copy or distribute this message. If you have 
> mistakenly received this message, please notify the sender by a reply e-mail 
> and delete this message. Thank you.
>
> This message has been scanned for malware by Forcepoint. www.forcepoint.com


Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误

2022-05-27 Thread godfrey he
使用了什么state backend?能描述一下产生上述问题的步骤吗?
是直接跑作业就产生上述错误,还是作业有基于sp启动,或者是中途重启过?

zhangbin  于2022年5月27日周五 13:34写道:

>
> Retry
>  回复的原邮件 
> 发件人 zhangbin 
> 发送日期 2022年05月27日 10:11
> 收件人 godfre...@gmail.com 
> 抄送人 user-zh 
> 主题 回复:Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
> 确认了下邮件的内容的确是异常信息,这里却把附件的内容展示出来了。我再重新发一次。
>
> 我们在使用Flink-1.15.0 cumulate window + grouping sets 执行SQL过程中,发现个问题,报如下错误:
>java.lang.IllegalArgumentException: key group from 93 to 95 does not
> contain 478
>at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:61)
>at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:27)
>at
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.processElement(AbstractWindowAggProcessor.java:164)
>at
> org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:221)
>at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>at java.lang.Thread.run(Thread.java:748)
>
>


Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误

2022-05-26 Thread godfrey he
把异常栈也发出来吧

zhangbin  于2022年5月26日周四 22:54写道:


Re: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-17 Thread godfrey he
这个包:flink-table_2.11-1.13.0.jar (legacy planner 的包)

Best,
Godfrey

liangjinghong  于2022年2月17日周四 09:59写道:
>
> 感谢老师的回复,然而我的部署环境下的lib中没有您说的这个包,请问是要移除哪个包呢?
>
> 我的lib下有的包:
> flink-csv-1.13.0.jar
> flink-dist_2.11-1.13.0.jar
> flink-json-1.13.0.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-mysql-cdc-2.1.1.jar
> flink-table_2.11-1.13.0.jar
> flink-table-blink_2.11-1.13.0.jar
> log4j-1.2-api-2.12.1.jar
> log4j-api-2.12.1.jar
> log4j-core-2.12.1.jar
> log4j-slf4j-impl-2.12.1.jar
> -邮件原件-
> 发件人: godfrey he [mailto:godfre...@gmail.com]
> 发送时间: 2022年2月16日 16:47
> 收件人: user-zh 
> 主题: Re: flink创建视图后,SELECT语句后使用OPTIONS报错
>
> Hi liangjinghong,
>
> 原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef
> 类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。
> 解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。
>
> Best,
> Godfrey
>
> liangjinghong  于2022年2月14日周一 17:26写道:
>
> > 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
> >
> > 代码:
> >
> > CREATE VIEW used_num_common
> >
> > (toolName,region,type,flavor,used_num)
> >
> > AS
> >
> > select info.toolName as toolName,r.regionName as
> > region,f.type,f.flavor,count(1) as used_num from
> >
> > tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
> >
> > join
> >
> > tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
> >
> > on job.jobId = task.jobId
> >
> > join
> >
> > tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
> >
> > on task.nodeId = node.id
> >
> > join
> >
> > tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
> >
> > on job.appId = info.appId
> >
> > join
> >
> > tbl_region r
> >
> > on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
> >
> > join
> >
> > tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
> >
> > on node.resourcesSpec = f.flavor
> >
> > where job.jobStatus in ('RUNNING','ERROR','INITING')
> >
> > and task.taskStatus in ('RUNNING','ERROR','INITING')
> >
> > and node.machineStatus <> 'DELETED'
> >
> > and toolName is not null
> >
> > group by info.toolName,r.regionName,f.type,f.flavor
> >
> > …
> >
> > 打包部署后报错如下:
> >
> > The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> > SPECIAL
> >
> > 2022-02-08 13:33:39,350 WARN
> > org.apache.flink.client.deployment.application.DetachedApplicationRunn
> > er []
> > - Could not execute application:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> > SPECIAL
> >
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(Package
> > dProgram.java:372)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeF
> > orExecution(PackagedProgram.java:222)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:11
> > 4)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.deployment.application.DetachedApplicationRunn
> > er.tryExecuteJobs(DetachedApplicationRunner.java:84)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.deployment.application.DetachedApplicationRunn
> > er.run(DetachedApplicationRunner.java:70)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$hand
> > leRequest$0(JarRunHandler.java:102)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFutu
> > re.java:1700)
> > [?:?]
> >
> > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515
> > )
> > [?:?]
> >
> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.r
> > un(ScheduledThreadPoolExecutor.java:304)
> > [?:?]
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j
> > ava:1128)
> > [?:?]
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.

Re: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-16 Thread godfrey he
Hi liangjinghong,

原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef
类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。
解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。

Best,
Godfrey

liangjinghong  于2022年2月14日周一 17:26写道:

> 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
>
> 代码:
>
> CREATE VIEW used_num_common
>
> (toolName,region,type,flavor,used_num)
>
> AS
>
> select info.toolName as toolName,r.regionName as
> region,f.type,f.flavor,count(1) as used_num from
>
> tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
>
> join
>
> tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
>
> on job.jobId = task.jobId
>
> join
>
> tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
>
> on task.nodeId = node.id
>
> join
>
> tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
>
> on job.appId = info.appId
>
> join
>
> tbl_region r
>
> on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
>
> join
>
> tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
>
> on node.resourcesSpec = f.flavor
>
> where job.jobStatus in ('RUNNING','ERROR','INITING')
>
> and task.taskStatus in ('RUNNING','ERROR','INITING')
>
> and node.machineStatus <> 'DELETED'
>
> and toolName is not null
>
> group by info.toolName,r.regionName,f.type,f.flavor
>
> …
>
> 打包部署后报错如下:
>
> The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> SPECIAL
>
> 2022-02-08 13:33:39,350 WARN
> org.apache.flink.client.deployment.application.DetachedApplicationRunner []
> - Could not execute application:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> [?:?]
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Caused by: java.lang.UnsupportedOperationException: class
> org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>
> at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
> 

Re: sql-gateway和jdbc-driver还维护吗?

2022-01-11 Thread godfrey he
Hi Ada,

sql-gateway之前没有维护起来,确实是一个遗憾。
最近我们也关注到大家对batch的兴趣越来越浓,sql-gateway还会继续维护。

btw,非常欢迎分享一下你们使用Flink替换Spark遇到的一些痛点,我们会逐渐去解决这些痛点

Best,
Godfrey

Ada Wong  于2022年1月12日周三 10:09写道:
>
> cc tsreaper and Godfrey He
>
> 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:
>
> >
> > 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> > "user-zh"   
> >  
> >  > 发送时间:2022年1月10日(星期一) 晚上7:32
> > 收件人:"user-zh" >
> > 主题:Re: sql-gateway和jdbc-driver还维护吗?
> >
> >
> >
> > https://github.com/ververica/flink-jdbc-driver
> > https://github.com/ververica/flink-sql-gateway
> >
> > Ada Wong  > 
> >  我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。


Re: 创建表t1的视图v1之后rowtime属性丢失

2021-11-02 Thread godfrey he
可以把具体的sql发出来看看

yidan zhao  于2021年11月2日周二 下午7:06写道:
>
> 如题,我原先基于flink1.11和1.12貌似没这个问题。目前基于1.13出现这个问题。
> 问题描述如下:
> 我t1是kafka表,其中有个属性是event_time是row time属性,然后创建了view v1,通过select  ,
> event_time from t1这样创建。  现在问题是这么创建之后,我基于v1查询报错说aggre.. window只能在time
> attributes上定义。
> 不清楚是版本变化导致,还是我其他地方搞错了呢。


Re: Re: Re: 公司数据密文,实现group by和join

2021-11-01 Thread godfrey he
上传的图片没法显示,通过图床工具或纯文本方式重新发一遍

lyh1067341434  于2021年11月1日周一 上午10:42写道:

> 您好!
>
> 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组;
> 为了更清楚表达,下面为图示:
>
> 谢谢您!
>
>
>
>
>
>
>
> 在 2021-10-29 10:49:35,"Caizhi Weng"  写道:
> >Hi!
> >
> >你是不是想写这样的 SQL:
> >
> >SELECT id, sum(price) AS total_price FROM (
> >  SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 ON
> >decrypt_udf(T1.id, T2.id) = 1
> >) GROUP BY id
> >
> >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。
> >
> >lyh1067341434  于2021年10月29日周五 上午9:41写道:
> >
> >> 您好!
> >>
> >>
> >>   感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了;
> >>
> >>
> >> 比如 有一张表 a, 表a有
> >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA");
> >>
> >>
> >> 如果我想求 不同id组的price之和:
> >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确;
> >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确;
> >>
> >>
> >>
> >>
> >> 问题:
> >>
> >>
> >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确;
> >>我看到api只能指定分组的key是哪一个;
> >>
> >>
> >> 谢谢您!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-10-28 11:09:26,"Caizhi Weng"  写道:
> >> >Hi!
> >> >
> >> >没太明白你的需求。你的需求是不是
> >> >
> >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。
> >> >
> >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。
> >> >
> >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。
> >> >
> >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by
> >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。
> >> >
> >> >[1]
> >> >
> >> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/
> >> >
> >> >lyh1067341...@163.com  于2021年10月27日周三 下午5:20写道:
> >> >
> >> >> 您好:
> >> >>
> >> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group
> >> >> by,对于使用spark和flink算子不知道如何实现。
> >> >>
> >> >> 问题:
> >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口)
> >> >>
> >> >> 谢谢您。
> >> >>
> >> >>
> >> >>
> >> >> 发自 网易邮箱大师
> >>
>
>
>
>
>


[ANNOUNCE] Apache Flink 1.11.4 released

2021-08-10 Thread godfrey he
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.4, which is the fourth bugfix release for the Apache Flink 1.11
series.



Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.



The release is available for download at:

https://flink.apache.org/downloads.html



Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2021/08/09/release-1.11.4.html



The full release notes are available in Jira:

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349404



We would like to thank all contributors of the Apache Flink community who
made this release possible!



Regards,

Godfrey


Re: k8s session模式SQLclient怎样连接

2021-07-23 Thread godfrey he
我建了一个jira,建议sql client把作业提交到各种集群的方式在文档里写清楚,
可以关注 https://issues.apache.org/jira/browse/FLINK-23483

Best,
Godfrey


Caizhi Weng  于2021年7月23日周五 上午10:12写道:

> Hi!
>
> 可以考虑把 k8s session 的 flink rest api 地址暴露出来,然后客户端把 execution.target 设为
> remote,rest.address 和 rest.port 设为相应地址。
>
> maker_d...@foxmail.com  于2021年7月22日周四 下午9:46写道:
>
> > 大家好,
> > 我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
> > 现在我想使用sqlclient,在提交任务时提示 :
> > [ERROR] Could not execute SQL statement. Reason:
> > java.net.UnknownHostException: flink-cluster
> > 请问大家,如何使用sqlclient连接k8s上的flink session。
> > flink版本 1.12.4.
> >
> >
> >
> > maker_d...@foxmail.com
> >
>


Re: Lateral join not finding correlate variable

2020-11-19 Thread godfrey he
Hi Dylan,

I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.

[1] https://issues.apache.org/jira/browse/FLINK-20255

Best,
Godfrey

Dylan Forciea  于2020年11月19日周四 下午12:10写道:

> Godfrey,
>
>
>
> I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack
> trace running exactly this code:
>
>
>
> import org.apache.flink.api.scala._
>
> import org.apache.flink.core.fs.FileSystem.WriteMode
>
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>
> import org.apache.flink.table.api._
>
> import org.apache.flink.table.api.bridge.scala._
>
> import org.apache.flink.types.Row
>
> import org.apache.flink.table.annotation.FunctionHint
>
> import org.apache.flink.table.annotation.DataTypeHint
>
> import org.apache.flink.table.functions.TableFunction
>
>
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>   def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
>
> }
>
>   }
>
> }
>
> object Job {
>
>
>
>   def main(args: Array[String]): Unit = {
>
> val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
> streamTableEnv.createTemporarySystemFunction(
>
>   "SplitStringToRows",
>
>   classOf[SplitStringToRows]
>
> ) // Class defined in previous email
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table1 (
>
> id_source BIGINT PRIMARY KEY,
>
> attr1_source STRING,
>
> attr2 STRING
>
>   ) WITH (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
>'table-name' = '',
>
>'username' = '',
>
>'password' = '',
>
>'scan.fetch-size' = '500',
>
>'scan.auto-commit' = 'false')
>
> """)
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table2 (
>
> attr1_source STRING,
>
> attr2 STRING,
>
> attr3 DECIMAL,
>
> attr4 DATE
>
>   ) WITH (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
>'table-name' = '',
>
>'username' = '',
>
>'password' = '',
>
>'scan.fetch-size' = '500',
>
>'scan.auto-commit' = 'false')
>
> """)
>
>
>
> val q1 = streamTableEnv.sqlQuery("""
>
>   SELECT
>
> id_source AS id,
>
> attr1_source AS attr1,
>
> attr2
>
>   FROM table1
>
> """)
>
> streamTableEnv.createTemporaryView("view1", q1)
>
>
>
> val q2 = streamTableEnv.sqlQuery(
>
>   """
>
> SELECT
>
>   a.attr1 AS attr1,
>
>   attr2,
>
>   attr3,
>
>   attr4
>
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source,
> ';')) AS a(attr1)
>
> """)
>
> streamTableEnv.createTemporaryView("view2", q2)
>
>
>
> val q3 = streamTableEnv.sqlQuery("""
>
> SELECT
>
>   w.attr1,
>
>   p.attr3
>
> FROM view1 w
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
> attr1,
>
> attr3
>
>   FROM (
>
> SELECT
>
>   attr1,
>
>   attr3,
>
>   ROW_NUMBER() OVER (
>
> PARTITION BY attr1
>
> ORDER BY
>
>   attr4 DESC NULLS LAST,
>
>   w.attr2 = attr2 DESC NULLS LAST
>
>   ) AS row_num
>
>   FROM view2)
>
>   WHERE row_num = 1) p
>
> ON (w.attr1 = p.attr1)
>
> """)
>
> streamTableEnv.createTemporaryView("view3", q3)
>
>
>
> val view3 = streamTableEnv.from("view3")
>
>
>

Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
Dylan,

Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as
the input tables,
but I still don't reproduce the exception. Could you provide your full
code, including ddl, query, etc.
Thanks so much.

Best,
Godfrey



Dylan Forciea  于2020年11月18日周三 下午10:09写道:

> Godfrey,
>
>
>
> I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and
> am still having the same issue. Note that I am using the JDBC Connector for
> the input tables, and table1 and table2 are actually created from queries
> on those connector tables and not directly.
>
>
>
> Since you indicated what I did should work, I played around a bit more,
> and determined it’s something inside of the table2 query that is triggering
> the error. The id field there is generated by a table function. Removing
> that piece made the plan start working. Table 2 is formulated as follows:
>
>
>
> SELECT
>
>   T.id,
>
>   attr2,
>   attr3,
>
>   attr4
>
> FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)
>
>
>
> Where SplitStringToRows is defined as:
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>
>
>   def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
>
> }
>
>   }
>
> }
>
>
>
> Removing the lateral table bit in that first table made the original query
> plan work correctly.
>
>
>
> I greatly appreciate your assistance!
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> *From: *godfrey he 
> *Date: *Wednesday, November 18, 2020 at 7:33 AM
> *To: *Dylan Forciea 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Lateral join not finding correlate variable
>
>
>
> Hi Dylan,
>
>
>
> Could you provide which Flink version you find out the problem with?
>
> I test the above query on master, and I get the plan, no errors occur.
>
> Here is my test case:
>
> @Test
> def testLateralJoin(): Unit = {
>   *util*.addTableSource[(String, String, String, String, String)]("table1", 
> 'id, 'attr1, 'attr2, 'attr3, 'attr4)
>   *util*.addTableSource[(String, String, String, String, String)]("table2", 
> 'id, 'attr1, 'attr2, 'attr3, 'attr4)
>   val query =
> """
>   |SELECT
>   |  t1.id,
>   |  t1.attr1,
>   |  t2.attr2
>   |FROM table1 t1
>   |LEFT JOIN LATERAL (
>   |  SELECT
>   |id,
>   |attr2
>   |  FROM (
>   |SELECT
>   |  id,
>   |  attr2,
>   |  ROW_NUMBER() OVER (
>   |PARTITION BY id
>   |ORDER BY
>   |  attr3 DESC,
>   |  t1.attr4 = attr4 DESC
>   |  ) AS row_num
>   |FROM table2)
>   |WHERE row_num = 1) t2
>   |ON t1.id = t2.id
>   |""".stripMargin
>   *util*.verifyPlan(query)
> }
>
> Best,
>
> Godfrey
>
>
>
> Dylan Forciea  于2020年11月18日周三 上午7:44写道:
>
> This may be due to not understanding  lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
>   t1.id,
>
>   t1.attr1,
>
>   t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
> id,
>
> attr2
>
>   FROM (
>
> SELECT
>
>   id,
>
>   attr2,
>
>   ROW_NUMBER() OVER (
>
> PARTITION BY id
> ORDER BY
>
>   attr3 DESC,
>
>   t1.attr4 = attr4 DESC
>
>   ) AS row_num
>
> FROM table2
>
> WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
>
>  at

Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
Hi Dylan,

Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:

@Test
def testLateralJoin(): Unit = {
  util.addTableSource[(String, String, String, String,
String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
  util.addTableSource[(String, String, String, String,
String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
  val query =
"""
  |SELECT
  |  t1.id,
  |  t1.attr1,
  |  t2.attr2
  |FROM table1 t1
  |LEFT JOIN LATERAL (
  |  SELECT
  |id,
  |attr2
  |  FROM (
  |SELECT
  |  id,
  |  attr2,
  |  ROW_NUMBER() OVER (
  |PARTITION BY id
  |ORDER BY
  |  attr3 DESC,
  |  t1.attr4 = attr4 DESC
  |  ) AS row_num
  |FROM table2)
  |WHERE row_num = 1) t2
  |ON t1.id = t2.id
  |""".stripMargin
  util.verifyPlan(query)
}

Best,
Godfrey

Dylan Forciea  于2020年11月18日周三 上午7:44写道:

> This may be due to not understanding  lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
>   t1.id,
>
>   t1.attr1,
>
>   t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
> id,
>
> attr2
>
>   FROM (
>
> SELECT
>
>   id,
>
>   attr2,
>
>   ROW_NUMBER() OVER (
>
> PARTITION BY id
> ORDER BY
>
>   attr3 DESC,
>
>   t1.attr4 = attr4 DESC
>
>   ) AS row_num
>
> FROM table2
>
> WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>
>  at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>
>  at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>
>  at scala.collection.Iterator.foreach(Iterator.scala:943)
>
>  at scala.collection.Iterator.foreach$(Iterator.scala:943)
>
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>
>  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>
>  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
>  at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>
>  at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>
>  at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>
>  at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>
>  at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>
>  at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
>
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>
>  at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>
>  at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>
>  at
> org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
>
>  at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
>
>  at io.oseberg.flink.well.ok.Job.main(Job.scala)
>
>
>
> The only other thing I can think of doing is creating a Table Aggregate
> function to pull this off. But, I wanted to check to make sure I wasn’t
> doing something wrong in the above first, or if there is something I’m not
> thinking of doing.
>
>
>
> Regards,
>
> Dylan Forciea
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread godfrey he
例如 calc merge rule,还有calc,agg等其他相关rule,点比较散。得具体看

jun su  于2020年9月23日周三 上午10:22写道:

> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he  于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > blink planner之前解过一些类似的案例。
> >
> > jun su  于2020年9月23日周三 上午9:53写道:
> >
> > > hi godfrey,
> > >
> > > 刚看了下, blink应该也会用hep , 上文说错了
> > >
> > > jun su  于2020年9月23日周三 上午9:19写道:
> > >
> > > > hi godfrey,
> > > > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > >
> > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > >
> > > >> blink planner 有这个问题吗?
> > > >>
> > > >> jun su  于2020年9月22日周二 下午3:27写道:
> > > >>
> > > >> > hi all,
> > > >> >
> > > >> > 环境: flink-1.9.2 flink table planner
> > > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > >> >
> > > >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > 导致进程OOM
> > > >> > ---
> > > >> > 代码:
> > > >> >
> > > >> > fbTableEnv.registerTableSource("source",orcTableSource)
> > > >> >
> > > >> > val select = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> > ")
> > > >> >
> > > >> > fbTableEnv.registerTable("selectTable",select)
> > > >> >
> > > >> > val t1 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > selectTable
> > > >> > where Auth_Roles like 'a%'")
> > > >> > fbTableEnv.registerTable("t1",t1)
> > > >> >
> > > >> > val t2 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1
> where
> > > >> > Target_UserSid= 'b'")
> > > >> > fbTableEnv.registerTable("t2",t2)
> > > >> >
> > > >> > val t3 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2
> where
> > > >> > Thread_ID= 'c'")
> > > >> > fbTableEnv.registerTable("t3",t3)
> > > >> >
> > > >> > val t4 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3
> where
> > > >> > access_path= 'd'")
> > > >> > fbTableEnv.registerTable("t4",t4)
> > > >> >
> > > >> > val t5 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4
> where
> > > >> > action= 'e'")
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Best,
> > > >> > Jun Su
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread godfrey he
Hi Jun,

可能是old planner缺少一些rule导致遇到了corner case,
blink planner之前解过一些类似的案例。

jun su  于2020年9月23日周三 上午9:53写道:

> hi godfrey,
>
> 刚看了下, blink应该也会用hep , 上文说错了
>
> jun su  于2020年9月23日周三 上午9:19写道:
>
> > hi godfrey,
> > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> >
> > godfrey he  于2020年9月22日周二 下午8:58写道:
> >
> >> blink planner 有这个问题吗?
> >>
> >> jun su  于2020年9月22日周二 下午3:27写道:
> >>
> >> > hi all,
> >> >
> >> > 环境: flink-1.9.2 flink table planner
> >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> >> >
> >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> 导致进程OOM
> >> > ---
> >> > 代码:
> >> >
> >> > fbTableEnv.registerTableSource("source",orcTableSource)
> >> >
> >> > val select = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
> >> >
> >> > fbTableEnv.registerTable("selectTable",select)
> >> >
> >> > val t1 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> selectTable
> >> > where Auth_Roles like 'a%'")
> >> > fbTableEnv.registerTable("t1",t1)
> >> >
> >> > val t2 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> >> > Target_UserSid= 'b'")
> >> > fbTableEnv.registerTable("t2",t2)
> >> >
> >> > val t3 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> >> > Thread_ID= 'c'")
> >> > fbTableEnv.registerTable("t3",t3)
> >> >
> >> > val t4 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> >> > access_path= 'd'")
> >> > fbTableEnv.registerTable("t4",t4)
> >> >
> >> > val t5 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> >> > action= 'e'")
> >> >
> >> >
> >> >
> >> > --
> >> > Best,
> >> > Jun Su
> >> >
> >>
> >
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best,
> Jun Su
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 Thread godfrey he
blink planner 有这个问题吗?

jun su  于2020年9月22日周二 下午3:27写道:

> hi all,
>
> 环境: flink-1.9.2 flink table planner
> 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
>
>   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
> ---
> 代码:
>
> fbTableEnv.registerTableSource("source",orcTableSource)
>
> val select = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
>
> fbTableEnv.registerTable("selectTable",select)
>
> val t1 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
> where Auth_Roles like 'a%'")
> fbTableEnv.registerTable("t1",t1)
>
> val t2 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> Target_UserSid= 'b'")
> fbTableEnv.registerTable("t2",t2)
>
> val t3 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> Thread_ID= 'c'")
> fbTableEnv.registerTable("t3",t3)
>
> val t4 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> access_path= 'd'")
> fbTableEnv.registerTable("t4",t4)
>
> val t5 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> action= 'e'")
>
>
>
> --
> Best,
> Jun Su
>


Re: Flink 1.11 jdbc查pg失败

2020-09-17 Thread godfrey he
据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解

wdmcode  于2020年9月10日周四 上午9:44写道:

> Hi Jimmy
>
> 给字段加双引号试试呢
> Select “F1”,”F2” from xxx.xxx;
>
>
> 发件人: Jimmy Zhang
> 发送时间: Thursday, September 10, 2020 9:41 AM
> 收件人: user-zh@flink.apache.org
> 主题: Flink 1.11 jdbc查pg失败
>
> flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗
>
> Best,
> Jimmy Signature is customized by Netease Mail Master
>
>


Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-17 Thread godfrey he
能提供完整的demo吗?

me  于2020年9月11日周五 下午6:54写道:

> 1.flink 版本是1.11.1
> streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamBlinkSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> streamTableEnv = StreamTableEnvironment.create(streamEnv,
> streamBlinkSettings)
>
> 2.我在执行sql后需要转为datastream所以最后使用的是dataStreamEnv.execute("SqlPlatformRealTime”)
> sql的结果Table会转为datastream然后addSink保存到kafka中。
>
>
>  原始邮件
> 发件人: silence
> 收件人: user-zh
> 发送时间: 2020年9月11日(周五) 18:49
> 主题: Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming
> topology. Cannot execute.
>
>
> 没有insert语句也就是没有sink无法触发计算 -- Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 Thread godfrey he
TableEnvironment 不是多线程安全的。

btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?

Jeff Zhang  于2020年9月14日周一 下午12:10写道:

> 参考zeppelin的做法,每个线程里都调用这个
>
>
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
>
>
> jun su  于2020年9月14日周一 上午11:54写道:
>
> > hi all,
> >
> > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> >
> > Caused by: java.lang.NullPointerException
> >   at java.util.Objects.requireNonNull(Objects.java:203)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >
> >
> >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > 解决
> >
> >
> > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> >
> > Caused by: java.lang.NullPointerException
> > at scala.Predef$.Double2double(Predef.scala:365)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > at
> >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > at
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > Source)
> > at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > Source)
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: flink hive批量作业报FileNotFoundException

2020-09-17 Thread godfrey he
cc @Rui Li 

李佳宸  于2020年9月14日周一 下午5:11写道:

> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
> 版本是1.11.1
> Caused by: java.io.FileNotFoundException: File
>
> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
> does not exist.
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
> at
>
> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题
>


Re: 关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助

2020-09-17 Thread godfrey he
sql
client的默认并发为1,如果没有在sql-client-defaults.yaml显示设置parallelism,代码里面的默认并发为1.因此需要显示的设置
sql-client-defaults.yaml的parallelism

Jark Wu  于2020年9月15日周二 上午11:43写道:

> Hi,
>
> 请问
> 1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令?
> 2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。
>
> Best,
> Jark
>
> On Fri, 11 Sep 2020 at 17:56, 引领  wrote:
>
> >
> >
> > 1、在checkpoint后,用ck恢复时报错。
> > org.apache.kafka.connect.errors.ConnectException:
> >
> com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
> > Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000,
> > eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19,
> dataLength=25879,
> > nextPosition=721073164, flags=0}
> > 2、关于flink cdc读取数据后,并执行join【加载维表的操作】后,写入mysql中。并发调不上去,一直是1
> > 我已在配置文件中做了相应的设置,包括sql-client中
> > taskmanager.numberOfTaskSlots: 5 # The parallelism used for
> > programs that did not specify and other parallelism.
> >  parallelism.default: 5
> >
> >
> > 我的sql是:
> >
> >
> > Insert into orders Select * from order o join sku s FOR SYSTEM_TIME
> as
> > of o.proc_time s  on o.sku_id = s.id
> >
> >
> > 提前感谢各位大佬回复
> >
> >
> >
> >
> >
> >
>


Re: Flink SQL create view问题

2020-09-17 Thread godfrey he
已知问题,已fix:https://issues.apache.org/jira/browse/FLINK-18750

guaishushu1...@163.com  于2020年9月16日周三 下午2:32写道:

> 当create_view和LATERAL TABLE 共用时 会出现字段找不到异常
>
> 语法:
> CREATE TABLE billing_data_test (
> message  STRING
>
>
> create view v1 as
> select T.*
> from billing_data_test,
> LATERAL TABLE(SplitUdtf(message)) as T(scate1,  scate2,  belong_local1,
> ssrc2,  gift,  coupon,  local_type);
>
> 异常:
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'message' not found in any table
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 14:32:04,857] INFO ---  at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> (com.dataplatform.flink.util.FlinkDebugThread)
>
>
>
>
>
> guaishushu1...@163.com
>


Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 Thread godfrey he
blink 根据每个算子的digest信息来判断是否可以reuse(只有digest完全一样才可以reuse),
例如table source节点,算子信息包括:表名,select的字段信息,其他push down之后的信息等。
你可以通过explain的方式把plan打印出来看看,source的digest是否一样

Jingsong Li  于2020年9月17日周四 下午2:45写道:

> 你仔细看看这两个数据源是不是有什么不同
> 只要有一点不同,Blink 就 reuse 不了
>
> On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote:
>
> > 场景描述:
> > 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
> > 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
> > 问题描述:
> > Flink SQL 解析器会为每个聚合运算创建相同的两个数据源
> >
> > 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源
> > - table.optimizer.reuse-source-enabled
> > - table.optimizer.reuse-sub-plan-enabled
> >
> > 请问下,有人碰到类似问题么?
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.9 关于回撤流的问题

2020-09-17 Thread godfrey he
可以用flink提供的“去重"语法来支持

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D

Shengkai Fang  于2020年9月15日周二 下午4:02写道:

> hi, 我对于使用upsert
>
> kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗?
>
> star <3149768...@qq.com> 于2020年6月8日周一 上午9:38写道:
>
> > 非常感谢,正是我想要的。也谢谢金竹老师的分享!
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"Sun.Zhu"<17626017...@163.com;
> > 发送时间:2020年6月7日(星期天) 凌晨0:02
> > 收件人:"user-zh@flink.apache.org" > 抄送:"user-zh@flink.apache.org" > 主题:回复:flink 1.9 关于回撤流的问题
> >
> >
> >
> > Hi,star
> > 金竹老师发过一篇文章,重写了KafkaConnector的实现,支持upsert模式,可以参考下[1]
> >
> >
> > [1]https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月3日 14:47,star<3149768...@qq.com 写道:
> > 大家好,
> >
> >
> >
> >
> 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了)
> > 问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析
> >
> >
> >
> >
> > 谢谢
>


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread godfrey he
Hi Dan,

What kind of joins [1] you are using? Currently, only temporal join and
join with table function
do not reshuffle the input data in Table API and SQL, other joins always
reshuffle the input data
based on join keys.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins

Best,
Godfrey


Dan Hill  于2020年9月17日周四 上午3:44写道:

> Hi Dawid!
>
> I see.  Yea, this would break my job after I move away from the prototype.
>
> How do other Flink devs avoid unnecessary reshuffles when sourcing data
> from Kafka?  Is the Table API early or not used often?
>
>
>
>
> On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Dan,
>>
>> I am afraid there is no mechanism to do that purely in the Table API yet.
>> Or I am not aware of one. If the reinterpretAsKeyedStream works for you,
>> you could use this approach and convert a DataStream (with the
>> reinterpretAsKeyedStream applied) to a Table[1] and then continue with the
>> Table API.
>>
>> On the topic of reinterpretAsKeyedStream, I wanted to stress out one
>> thing. I'd like to bring your attention to this warning:
>>
>> *WARNING*: The re-interpreted data stream *MUST* already be
>> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would partition
>> the data in a shuffle w.r.t. key-group assignment.
>>
>> I think it is not trivial(or even not possible?) to achieve unless both
>> the producer and the consumer are Flink jobs with the same parallelism.
>>
>> Best,
>>
>> Dawid
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
>> On 16/09/2020 18:22, Dan Hill wrote:
>>
>> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in
>> my prototype.
>>
>> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
>>> stream" feature? [1] However I'm not sure if and how can it be integrated
>>> with the Table API. Maybe someone more familiar with the Table API can help
>>> with that?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>>
>>> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>>>
 How do I avoid unnecessary reshuffles when using Kafka as input?  My
 keys in Kafka are ~userId.  The first few stages do joins that are usually
 (userId, someOtherKeyId).  It makes sense for these joins to stay on the
 same machine and avoid unnecessary shuffling.

 What's the best way to avoid unnecessary shuffling when using Table SQL
 interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
 keys for Kafka.







Re: flink-sql-gateway还会更新吗

2020-08-30 Thread godfrey he
已更新至flink1.11.1

godfrey he  于2020年8月24日周一 下午9:45写道:

> 我们会在这周让flink-sql-gateway支持1.11,请关注
> 另外,sql-client支持gateway模式,据我所知目前还没计划。
>
> shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:
>
>> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: flink1.11 sql api使用per job模式提交后,客户端退出

2020-08-24 Thread godfrey he
如果是通过TableEnvironment#execute方法提交需要设置execution.attached=true, 或者是通过flink
cli的 加上-d
如果是通过TableEnvironment#executeSql方法提交,需要代码里显示的等待作业结束:
TableResult tableResult = tEnv.executeSql(xxx);
// wait job finished
tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

lijufeng2016 <920347...@qq.com> 于2020年8月25日周二 上午9:34写道:

> flink1.11 sql api使用per
> job模式提交后,客户端退出,程序在yarn正常运行,客户端与yarn断开连接,与有没有办法不让客户端断开?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql-gateway还会更新吗

2020-08-24 Thread godfrey he
我们会在这周让flink-sql-gateway支持1.11,请关注
另外,sql-client支持gateway模式,据我所知目前还没计划。

shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:

> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink log4j2 问题

2020-08-24 Thread godfrey he
检查一下log4j2相关的版本,参考[1]

[1]
https://stackoverflow.com/questions/50970960/facing-issue-with-log4j2-java-lang-exceptionininitializererror

guaishushu1...@163.com  于2020年8月24日周一 上午11:18写道:

> SQL提交会出现这种问题???
> Caused by: java.lang.IllegalArgumentException: Initial capacity must be at
> least one but was 0
> at
> org.apache.logging.log4j.util.SortedArrayStringMap.(SortedArrayStringMap.java:102)
> at
> org.apache.logging.log4j.core.impl.ContextDataFactory.createContextData(ContextDataFactory.java:109)
> at
> org.apache.logging.log4j.core.impl.ContextDataFactory.(ContextDataFactory.java:57)
> ... 29 more
>
>
>
> guaishushu1...@163.com
>


Re: Format for timestamp type in Flink SQL

2020-08-18 Thread godfrey he
Hi Youngwoo,

> 1. TIMESTAMP WITH LOCAL TIME ZONE
Currently, SQL client uses legacy types for the collect sink, that
means `TIMESTAMP
WITH LOCAL TIME ZONE` is not supported.
you can refer to [1] to find the supported types, and there is a pr [2] to
fix this.

>2. TIMESTAMP(3) WITH LOCAL TIME ZONE
I do not reproduce the exception

> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
sql parser does not support them yet.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html
[2] https://github.com/apache/flink/pull/12872

Best,
Godfrey

Youngwoo Kim (김영우)  于2020年8月16日周日 上午1:23写道:

> Hi Benchao,
>
> I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL
> but it does not work with slightly different errors:
>
> 1. TIMESTAMP WITH LOCAL TIME ZONE
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
>
> Caused by: org.apache.flink.table.api.TableException: Unsupported
> conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion
> class: java.time.Instant) to type information. Only data types that
> originated from type information fully support a reverse conversion.
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
>
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
>
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
>
> at
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
>
> at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
>
> at
> org.apache.flink.table.client.gateway.local.result.CollectStreamResult.(CollectStreamResult.java:71)
>
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:101)
>
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:129)
>
> at
> org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
>
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
>
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
>
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
>
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
>
> at java.util.Optional.ifPresent(Optional.java:159)
>
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
>
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
>
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>
>
> 2. TIMESTAMP(3) WITH LOCAL TIME ZONE
>
>
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3)
> WITH LOCAL TIME ZONE
>
>
> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
>
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
> line 3, column 32.
> Was expecting:
> "LOCAL" ...
>
>
>
> It looks like the timestamp '-MM-ddTHH:mm:ss.SSSZ' is not supported in
> both 'SQL' and 'ISO-8601' format standard.
>
> Just curious, what is the default format for timestamp type with a time
> zone?
>
>
> Thanks,
>
> Youngwoo
>
>
> On Sat, Aug 15, 2020 at 8:16 PM Benchao Li  wrote:
>
>> Hi Youngwoo,
>>
>> What version of Flink and Json Format are you using?
>> From 1.11, we introduced  `json.timestamp-format.standard` to declare the
>> timestamp format.
>> You can try `timestamp with local zone` data type with `ISO-8601`
>> timestamp format.
>>
>> Youngwoo Kim (김영우)  于2020年8月15日周六 下午12:12写道:
>>
>>> Hi,
>>>
>>> I'm trying to create a table using Flink SQL to query from a Kafka
>>> topic. Messages from Kafka look like following:
>>>
>>> (snip)
>>> "svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
>>> 

Re: ScalarFunction 访问 state

2020-08-18 Thread godfrey he
看看deduplication语法[1] 是否满足你的需求

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication

forideal  于2020年8月17日周一 下午12:13写道:

> Hi,
>
> 最近我有一个使用 Flink SQL 做简单的数据去重的需求,想使用 Flink 的 `ScalarFunction`,通过阅读
> API 发现 FunctionContext context 并不支持访问 state。
> 我准备使用 Guava cache 做,不知道小伙伴有没有更好的建议哈!感谢。
>
>
> Best,forideal


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-18 Thread godfrey he
> GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
是要1个小时的window结束才会出结果。
你可以通过把window缩小或者设置early-fire来提早看到数据
table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=xx

>  手动拿到那个executeSql的返回的TableResult,然后去   wait job finished
这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束

DanielGu <610493...@qq.com> 于2020年8月17日周一 下午4:04写道:

> hi,
> flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
> 最近调试卡在这里..有点出不来了
> 十分感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.1 sql client 流式 join 出现预期之外的含 null 行

2020-08-13 Thread godfrey he
可以把原始的计算结果打印出来,执行 set execution.result-mode=changelog
(如果source有delete消息,可能会出现null值)


LittleFall <1578166...@qq.com> 于2020年8月13日周四 下午3:33写道:

> mysql 的建表语句
> use test;
> create table base (
> id int primary key,
> location varchar(20)
> );
> create table stuff(
> id int primary key,
> b_id int,
> name varchar(20)
> );
>
> flink sql client 的建表语句
> create table base (
> id int primary key,
> location varchar(20)
> )WITH (
>'connector' = 'kafka',
>'topic' = 'example',
>'properties.group.id' = 'testGroup',
>'scan.startup.mode' = 'latest-offset',
>'properties.bootstrap.servers' = 'localhost:9092',
>'format' = 'canal-json'
> );
> create table stuff(
> id int primary key,
> b_id int,
> name varchar(20)
> )WITH (
>'connector' = 'kafka',
>'topic' = 'example',
>'properties.group.id' = 'testGroup',
>'scan.startup.mode' = 'latest-offset',
>'properties.bootstrap.servers' = 'localhost:9092',
>'format' = 'canal-json'
> );
>
> flink 查询语句
> select distinct stuff.id s_id, base.id b_id, base.location, stuff.name
> from stuff inner join base
> on stuff.b_id = base.id;
>
> mysql 插入语句
> insert into base values (1, 'beijing');
> insert into stuff values (1, 1, 'zz');
>
> flink 结果
> <
> http://apache-flink.147419.n8.nabble.com/file/t858/2020-08-13_15-12-36_%E7%9A%84%E5%B1%8F%E5%B9%95%E6%88%AA%E5%9B%BE.png>
>
>
> mysql 执行同样的查询的结果:
> +--+--+--+--+
> | s_id | b_id | location | name |
> +--+--+--+--+
> |1 |1 | beijing  | zz   |
> +--+--+--+--+
> 1 row in set (0.01 sec)
>
>
> 而且有时候连结果正确的行都不会出现,只会出现含 null 的行。
>
> 求助大家。。。
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 Thread godfrey he
sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC)  于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> 
>
> 在 2020/8/13 下午3:10,“Jeff Zhang” 写入:
>
> 你的10台机器是flink standalone还是 yarn集群 ?
> 如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
> 另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> 或者加入钉钉群讨论,钉钉群号: 32803524
>
>
> Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:
>
> > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> > 现在比较混乱,哪些jar需要放到A,哪些放到B。
> >
> >
> > (1) kafka ssl
> >
> 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
> >
> > (2)
> >
> flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
> >
> >
> >
> >
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
> >
> > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
> >
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: GroupBy with count on a joint table only let met write using toRetractStream

2020-08-11 Thread godfrey he
Hi Faye,

1) In your sql, different events are for different groups, it seems hard to
extract a global Filter into DataStream.
2) AFAK, you can just drop the retract message (the flag is false), and
then convert the retract stream to append stream.
 The downstream job needs to duplicate the records, just like [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

Best,
Godfrey

Faye Pressly  于2020年8月8日周六 上午3:30写道:

> Sorry just notice I made a typo in the last table (clickAdvertId != null
> instead of clickCount !=null)
>
> Table allImpressionTable = impressionsTable
>   .leftOuterJoin(clicksTable, "clickImpId = impImpressionId &&
> clickMinute = impMinute")
>   .groupBy("impAdvertId, impVariationName, impMinute")
>   .select("impAdvertId, impVariationName, clickAdvertId.count as
> clickCount, impMinute")
>.where("clickAdvertId != null");
>
> --
> *From:* Faye Pressly
> *Sent:* Friday, August 7, 2020 9:28 PM
> *To:* user@flink.apache.org 
> *Subject:* GroupBy with count on a joint table only let met write using
> toRetractStream
>
> Hello,
>
> I have a steam of events (coming from a Kinesis Stream) of this form:
>
> impressionId | advertid | variationName | eventType | eventTime
>
> The end goal is to output back on a Kinesis Stream the count of event of
> type 'impression' and the count of events of type 'click'
>
> however, I need to drop (or ignore) event of type clicks that don't have a
> matching impressionId with an event of type 'impression' (So basically I
> need to discard click event that don't have an impression)
>
> This is how tackled my solution:
>
> // Convert the stream into a table
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId,
> advertId, variationName, eventType, eventTime.rowtime");
> tEnv.registerTable("Events", eventsTable);
>
> // Create a table with only event of type clicks
> Table clicksTable = eventsTable
>   .where("eventType = 'click'")
>
> .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
>   .groupBy("impressionId, advertId, variationName, minuteWindow")
>   .select("impressionId as clickImpId, creativeId as clickAdvertId,
> variationName as clickVariationName, minuteWindow.rowTime as clickMinute");
>
> // Create a table with only event of type impression
> Table impressionsTable = eventsTable
>   .where("eventType = 'impression'")
>
> .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
>   .groupBy("impressionId, advertId, variationName, minuteWindow")
>   .select("impressionId as impImpressionId, advertId as impAdvertId,
> variationName as impVariationName, eventTime, minuteWindow.rowTime as
> impMinute");
>
> // left join the impression with the clicks using the impressionId as well
> as the temporal field
> //and then group by to generate a count of all the click that have a
> matching impression (aka row where clickAdvertId is not null)
> Table allImpressionTable = impressionsTable
>   .leftOuterJoin(clicksTable, "clickImpId = impImpressionId &&
> clickMinute = impMinute")
>   .groupBy("impAdvertId, impVariationName, impMinute")
>   .select("impAdvertId, impVariationName, clickAdvertId.count as
> clickCount, impMinute")
>.where("clickCount != null");
> [ same logic to count impressions]
>
> Now to debug and to see if the counts are correct I usually use "
> tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm
> able to use that new created stream to send it back on a kinesis Stream
>
> However I have an error saying that I cannot use toAppendStream and that
> instead I have to use toRetractStream. It indeed works and I can see the
> counts in the output are correct however I don't understand how I can use
> the result contained in this new stream because it has multiple rows with
> "true"/"false" and the correct count is usuall the last entry with the
> "true" key.
>
> I have multiple question:
>
> 1) I'm very new with Flink and I would like to know if my approach to
> filter-out un-matching events is the correct one ? (stream -> table and
> joins -> stream)
> Is there a much easier way of doing this ? Is it perhaps possible to
> filter all these events directly in the DataStream?
>
>
> 2) How do I use the retractStream? How do use it in order to send the
> final counts to a sink and not the entirety of the "true/False"
> insert/Delete rows?
>
>
> Thank you!
>


Re: flink 1.11 使用sql将流式数据写入hive

2020-08-11 Thread godfrey he
 tEnv.executeSql(insertSql); 是异步提交完任务就返回了,
如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束,
目前可以通过下面这种方式
TableResult result = tEnv.executeSql(insertSql);
result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

另外  tEnv.executeSql(insertSql); 已经提交作业了,不需要调用  bsEnv.execute("test");

liya...@huimin100.cn  于2020年8月11日周二 下午3:20写道:

> 下面粘的就是主程序代码
>
> 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助
>
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);DataStream dataStream = bsEnv.addSource(new
> MySource());//构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a
> local path
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> tEnv.createTemporaryView("users", dataStream);
>
> Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts,
> '-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM
> users");
>
>
> tEnv.toRetractStream(result3, TypeInformation.of(new
> TypeHint>(){})).print("res");//
> 如果hive中已经存在了相应的表,则这段代码省略
> //String hiveSql = "CREATE TABLE fs_table (\n" +
> // "  user_id STRING,\n" +
> // "  order_amount DOUBLE \n" +
> // ") partitioned by (dt string,h string,m string) \n"
> +
> // "stored as textfile \n" +
> // "TBLPROPERTIES (\n" +
> // "
> 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> // "  'sink.partition-commit.delay'='0s',\n" +
> // "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> // "  'sink.partition-commit.policy.kind'='metastore'"
> +
> // ")";
> //tEnv.executeSql(hiveSql);
>
> String insertSql = "insert into table fs_table partition (dt,h,m)
> SELECT userId, amount, DATE_FORMAT(ts, '-MM-dd') dt, DATE_FORMAT(ts,
> 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";
>
> tEnv.executeSql(insertSql);
>
> bsEnv.execute("test");
>
>
> liya...@huimin100.cn
>


Re: flink sql状态清理问题

2020-08-09 Thread godfrey he
配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?

Benchao Li  于2020年8月10日周一 上午10:36写道:

> Hi,
>
> 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
>
> op <520075...@qq.com> 于2020年8月10日周一 上午10:27写道:
>
> > Hi
> >   在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> >  val config = tableConfig.getConfiguration()
> >  config.setString("table.exec.mini-batch.enabled",
> > "true")
> > 
> config.setString("table.exec.mini-batch.allow-latency",
> > "5s")
> >  config.setString("table.exec.mini-batch.size", "20")
> >
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> > 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 来自郭华威的邮件

2020-08-09 Thread godfrey he
BatchTableEnvironmentImpl 属于 old planner,
缺少 flink-table-planner_${scala.binary.version}.jar 的依赖

郭华威  于2020年8月10日周一 上午10:21写道:

> flink1.11.1 使用tableApi  报错:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Create BatchTableEnvironment failed.
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
> at
> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
> ... 2 more
> 但是相关的依赖都有的,下面是pom文件:
> 
>UTF-8
>1.11.1
>5.1.40
>2.11
>2.11.12
>1.8
>${java.version}
>${java.version}
> 
>
> 
>
>
>   org.apache.flink
>   flink-java
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>
> flink-table-api-java-bridge_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>   flink-table-common
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>   flink-connector-jdbc_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>
> flink-sql-connector-kafka-0.11_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>   flink-json
>   ${flink.version}
>
>
> 
> 
>   mysql
>   mysql-connector-java
>   ${mysql.version}
>
>
> 
> 
>   org.apache.flink
>   flink-clients_2.11
>   1.11.1
>
>
>
>   org.apache.flink
>   flink-connector-jdbc_2.11
>   1.11.1
>
>
>
>   org.apache.flink
>   flink-streaming-scala_2.11
>   1.11.0
>
>
> 
> 
>   org.slf4j
>   slf4j-log4j12
>   1.7.7
>   runtime
>
>
>   log4j
>   log4j
>   1.2.17
>   runtime
>
> 
>
>
>
>
>
>


Re: Submit Flink 1.11 job from java

2020-08-06 Thread godfrey he
hi Flavio,
Maybe you can try env.executeAsync method,
which just submits the job and returns a JobClient.

Best,
Godfrey

Flavio Pompermaier  于2020年8月6日周四 下午9:45写道:

> Hi to all,
> in my current job server I submit jobs to the cluster setting up an SSH
> session with the JobManager host and running the bin/flink run command
> remotely (since the jar is put in the flink-web-upload directory).
> Unfortunately, this approach makes very difficult to caputre all exceptions
> that a job submission could arise
> Is there a better way to invoke the execution of a main class contained in
> a jar file uploaded on the Job Manager? Ideally I could invoke the Flink
> REST API but the problem is that I need to call some code after
> env.execute() and that's not possible if I use them..every java code after
> env.execute() is discarded, while this does not happen if I use the CLI
> client.
>
> I know that there was some client refactoring in Flink 1.11 but I didn't
> find a solution to this problem yet.
>
> Thanks in advance for any help,
> Flavio
>


Re: Re: Re: FLINK SQL view的数据复用问题

2020-08-05 Thread godfrey he
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。

kandy.wang  于2020年8月5日周三 下午10:43写道:

>
>
>
>
>
>
> @ godfrey
> 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-04 19:36:56,"godfrey he"  写道:
> >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
> >
> >kandy.wang  于2020年8月4日周二 下午6:21写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> @ godfrey
> >> thanks。刚试了一下,source -> Deduplicate  ->
> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
> >>
> >>
> >> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >> >
> >> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >> >
> >> >> FLINK SQL view相关问题:
> >> >> create view order_source
> >> >>
> >> >> as
> >> >>
> >> >> select order_id, order_goods_id, user_id,...
> >> >>
> >> >> from (
> >> >>
> >> >> ..  proctime,row_number() over(partition by order_id,
> >> >> order_goods_id order by proctime desc) as rownum
> >> >>
> >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> >> properties.group.id'='flink_etl_kafka_hbase',
> >> >> 'scan.startup.mode'='latest-offset') */
> >> >>
> >> >> ) where  rownum = 1 and  price > 0;
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT),)
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_date as rowkey,
> >> >>
> >> >> sum(amount) as saleN,
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_date
> >> >>
> >> >> );
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT))
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_hour as rowkey,sum(amount) as saleN,
> >> >>
> >> >>
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_hour
> >> >>
> >> >> );
> >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer
> group。
> >> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  ->
> sink
> >> >> 2
> >> >>
> >> >>
> >> >> 本意是想通过view  order_source
> >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >> >>
> >> >>
> >>
>


Re: Unexpected unnamed sink in SQL job

2020-08-04 Thread godfrey he
I think we assign a meaningful name to sink Transformation
 like other Transformations in StreamExecLegacySink/BatchExecLegacySink.

Paul Lam  于2020年8月4日周二 下午5:25写道:

> Hi Jingsong,
>
> Thanks for your input. Now I understand the design.
>
> I think in my case the StreamingFileCommitter is not chained because its
> upstream operator is not parallelism 1.
>
> BTW, it’d be better if it has a more meaningful operator name.
>
> Best,
> Paul Lam
>
> 2020年8月4日 17:11,Jingsong Li  写道:
>
> StreamingFileCommitter
>
>
>


Re: Re: FLINK SQL view的数据复用问题

2020-08-04 Thread godfrey he
调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用

kandy.wang  于2020年8月4日周二 下午6:21写道:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> @ godfrey
> thanks。刚试了一下,source -> Deduplicate  ->
> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>
>
> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >
> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >
> >> FLINK SQL view相关问题:
> >> create view order_source
> >>
> >> as
> >>
> >> select order_id, order_goods_id, user_id,...
> >>
> >> from (
> >>
> >> ..  proctime,row_number() over(partition by order_id,
> >> order_goods_id order by proctime desc) as rownum
> >>
> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> properties.group.id'='flink_etl_kafka_hbase',
> >> 'scan.startup.mode'='latest-offset') */
> >>
> >> ) where  rownum = 1 and  price > 0;
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT),)
> >>
> >> from
> >>
> >> (
> >>
> >> select order_date as rowkey,
> >>
> >> sum(amount) as saleN,
> >>
> >> from order_source
> >>
> >> group by order_date
> >>
> >> );
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT))
> >>
> >> from
> >>
> >> (
> >>
> >> select order_hour as rowkey,sum(amount) as saleN,
> >>
> >>
> >>
> >> from order_source
> >>
> >> group by order_hour
> >>
> >> );
> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> >> 2
> >>
> >>
> >> 本意是想通过view  order_source
> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >>
> >>
>


Re: FLINK SQL view的数据复用问题

2020-08-04 Thread godfrey he
blink planner支持将多sink的query优化成尽量复用重复计算部分。
1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务

kandy.wang  于2020年8月4日周二 下午5:20写道:

> FLINK SQL view相关问题:
> create view order_source
>
> as
>
> select order_id, order_goods_id, user_id,...
>
> from (
>
> ..  proctime,row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum
>
> from hive.temp_dw.dm_trd_order_goods/*+ 
> OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
> 'scan.startup.mode'='latest-offset') */
>
> ) where  rownum = 1 and  price > 0;
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT),)
>
> from
>
> (
>
> select order_date as rowkey,
>
> sum(amount) as saleN,
>
> from order_source
>
> group by order_date
>
> );
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT))
>
> from
>
> (
>
> select order_hour as rowkey,sum(amount) as saleN,
>
>
>
> from order_source
>
> group by order_hour
>
> );
> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> 2
>
>
> 本意是想通过view  order_source
> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>
>


Re: Re: 有界数据中batch和stream的区别

2020-08-04 Thread godfrey he
你的运行环境是啥?能提供一下相关配置吗?

chenxuying  于2020年8月4日周二 下午2:46写道:

> 你好,请问下我修改后的语句是
> insert into print_sink select game_id,count(id) from mysql_source group by
> game_id
> 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
> 2> +I(12,1)
> 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2)
> 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)
>
>
> 然后如果我使用的是batchMode,他就报错了
> org.apache.flink.util.FlinkException: Error while shutting the
> TaskExecutor down.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> Suppressed: org.apache.flink.util.FlinkException: Could not properly shut
> down the TaskManager services.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
> at...
> ... 21 more
> Caused by: org.apache.flink.util.FlinkException: Could not close resource.
> at
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
> ... 37 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize
> class org.apache.flink.util.JavaGcCleanerWrapper]
>
>
> 不知道您是否知道原因
>
>
> 在 2020-08-04 12:11:32,"godfrey he"  写道:
> >逻辑上批产生的结果是Table,流产生的结果是Changelog。
> >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
> >最简单的方式可以将query改为带group by的,再看结果的差异。
> >更多关于Table和Changelog的概念可以参考 [1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
> >
> >chenxuying  于2020年8月4日周二 上午11:44写道:
> >
> >> hi :
> >> flink table sql 1.11.0
> >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
> >>
> >>
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //.inStreamingMode()
> >> .inBatchMode()
> >> .build();
> >>
> >>
> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> >> 不知道大佬们有没有例子可以比较容易理解
> >> 我的代码
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //.inStreamingMode()
> >> .inBatchMode()
> >> .build();
> >> TableEnvironment tableEnvironment =
> >> TableEnvironment.create(environmentSettings);
> >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> >> " id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'jdbc',  " +
> >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> >> " 'username' = 'root' , " +
> >> " 'password' = 'root', " +
> >> " 'table-name' = 'mysqlsink' , " +
> >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> >> " 'sink.buffer-flush.interval' = '2s', " +
> >> " 'sink.buffer-flush.max-rows' = '300' " +
> >> " )");
> >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> >> " id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'print'  " +
> >> " )");
> >> tableEnvironment.executeSql("insert into print_sink select id,game_id
> from
> >> mysql_source");
>


Re: 有界数据中batch和stream的区别

2020-08-03 Thread godfrey he
逻辑上批产生的结果是Table,流产生的结果是Changelog。
你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
最简单的方式可以将query改为带group by的,再看结果的差异。
更多关于Table和Changelog的概念可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html

chenxuying  于2020年8月4日周二 上午11:44写道:

> hi :
> flink table sql 1.11.0
> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>
>
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //.inStreamingMode()
> .inBatchMode()
> .build();
>
>
> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> 不知道大佬们有没有例子可以比较容易理解
> 我的代码
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //.inStreamingMode()
> .inBatchMode()
> .build();
> TableEnvironment tableEnvironment =
> TableEnvironment.create(environmentSettings);
> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "'connector' = 'jdbc',  " +
> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> " 'username' = 'root' , " +
> " 'password' = 'root', " +
> " 'table-name' = 'mysqlsink' , " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> " 'sink.buffer-flush.interval' = '2s', " +
> " 'sink.buffer-flush.max-rows' = '300' " +
> " )");
> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "'connector' = 'print'  " +
> " )");
> tableEnvironment.executeSql("insert into print_sink select id,game_id from
> mysql_source");


Re: Flink DDL 写 Hive parquet 是否支持 snappy压缩

2020-08-03 Thread godfrey he
parquet.compression=SNAPPY,更多信息可参考[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#format-options





lydata  于2020年8月4日周二 上午11:39写道:

>  Flink DDL的方式 写 Hive parquet  格式 ,是否支持 snappy压缩,如果支持 请问下参数是什么?


Re: UDF:Type is not supported: ANY

2020-08-03 Thread godfrey he
你把Map换为Map试试

zilong xiao  于2020年8月3日周一 下午4:56写道:

> 目前转List可以用数组代替,Map貌似没法成功运行
>
> zilong xiao  于2020年8月3日周一 上午10:43写道:
>
> > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is
> not
> > supported:
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> Json2Map
> > udf应该怎么操作呢?求前辈指导
> >
> > udfd代码如下:
> >
> > public class Json2List extends ScalarFunction {
> >
> >private static final Logger LOG =
> LoggerFactory.getLogger(Json2List.class);
> >
> >private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
> >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;
> >
> >public Json2List(){}
> >
> >public List eval(String param) {
> >   List result = new ArrayList<>();
> >   try {
> >  List> list = OBJECT_MAPPER.readValue(param,
> List.class);
> >  for(Map map : list){
> > result.add(OBJECT_MAPPER.writeValueAsString(map));
> >  }
> >  return result;
> >   } catch (JsonProcessingException e){
> >  LOG.error("failed to convert json to array, param is: {}",
> param, e);
> >   }
> >   return result;
> >}
> >
> >
> >@Override
> >public TypeInformation> getResultType(Class[]
> signature) {
> >   return Types.LIST(Types.STRING);
> >}
> >
> > }
> >
> >
>


Re: 数据预览

2020-08-02 Thread godfrey he
如果你想在client端拿到query的结果做preview的话,目前API层面支持直接collect或者print执行结果,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#execute-a-query

Jeff Zhang  于2020年8月1日周六 下午11:01写道:

> Apache Zeppelin有自己的rest api,你可以用rest api来提交flink sql
> 以及拿sql结果,目前Zeppelin社区正在做一个Client API (Zeppelin SDK),
> 用户可以更加方便的调用Zeppelin的功能。具体可以参考
> https://issues.apache.org/jira/browse/ZEPPELIN-4981
>
> 这里有Sample code 可以参考
>
> https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L298
>
> 对于Flink on Zeppelin感兴趣的,可以加入钉钉群:32803524
>
>
>
> forideal  于2020年8月1日周六 下午7:49写道:
>
> > 你好,我的朋友
> >
> >
> >最近我看 Flink doc 中的文档中有了如下 connector
> >   DataGen
> >   Print
> >   BlackHole
> >这大大的方便了开发和调试。不过,我还是不太满足,想了解一下数据预览相关的做法。
> >比如我想,如果我有一个 Flink 的 `driver` ,然后,我使用这个 driver 提交一条 SQL,我从
> ResultSet
> > 中获取数据。这样又可以大大的方面我们的 Flink SQL 开发者。
> >在社区中,我已经体验了 Apache Zeppelin ,他可以让我提交 Flink SQL,然后在页面上面等待刷新的结果,但是
> > Zeppelin 目前不能很好的集成到我们的 Flink web IDE 中。想了解一下如何实现数据预览。
> >
> >
> >Best forideal
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: StatementSet 里添加多个insertsql执行

2020-07-30 Thread godfrey he
StatementSet 中的多个insert会被编译成一个job提交。
你能提供一下对应的代码样例吗?

op <520075...@qq.com> 于2020年7月30日周四 下午3:57写道:

> 大家好,我发现StatementSet 里添加2个insertsql执行的时候会启动两个application,
> 这两个任务除了sink都是一样的吗?这样是不是会重复计算和浪费资源,而且两边的数据可能不同步,
> 有什么办法能解决?
> 谢谢


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
Yes, The pr still needs to be improved.
In most cases, there are more than one statement in the sql file,
so -f option should support multiple statements.
however, a related PR [1] has not completed yet.

[1] https://github.com/apache/flink/pull/8738

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午10:17写道:

> hi,godfrey:
> Thanks for your reply
>
> 1. I have seen the -u parameter, but my sql file may not only include
> 'insert into select ', but also SET, DDL, etc.
>
> 2. I may not have noticed this issue. I took a look at this issue. I think
> this issue may have some problems. For example, he finally called the
> CliClient.callCommand method.
> But I think that many options in callCommand are not completely suitable
> for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens
> a window to display the results, obviously this is not suitable for
> executing sql files
>
> godfrey he  于2020年7月29日周三 上午9:56写道:
>
>> hi Jun,
>>
>> Currently, sql client has supported -u option, just like:
>>  ./bin/sql-client.sh embedded -u "insert_statement".
>>
>> There is already a JIRA [1] that wants to support -f option
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12828
>>
>> Best,
>> Godfrey
>>
>> Jun Zhang  于2020年7月29日周三 上午9:22写道:
>>
>>> I want to execute some flink sql batch jobs regularly, such as 'insert
>>> into
>>> select .', but I can't find a suitable method so far, so reference
>>>  hive, I changed the source code and add a  '--filename'  parameter  so
>>> that we can execute a sql file.
>>>
>>> like this:
>>>
>>> /home/flink/bin/sql-client.sh embedded -f flink.sql
>>>
>>> what about any ideas or plans for this feature community?
>>>
>>


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
Yes, The pr still needs to be improved.
In most cases, there are more than one statement in the sql file,
so -f option should support multiple statements.
however, a related PR [1] has not completed yet.

[1] https://github.com/apache/flink/pull/8738

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午10:17写道:

> hi,godfrey:
> Thanks for your reply
>
> 1. I have seen the -u parameter, but my sql file may not only include
> 'insert into select ', but also SET, DDL, etc.
>
> 2. I may not have noticed this issue. I took a look at this issue. I think
> this issue may have some problems. For example, he finally called the
> CliClient.callCommand method.
> But I think that many options in callCommand are not completely suitable
> for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens
> a window to display the results, obviously this is not suitable for
> executing sql files
>
> godfrey he  于2020年7月29日周三 上午9:56写道:
>
>> hi Jun,
>>
>> Currently, sql client has supported -u option, just like:
>>  ./bin/sql-client.sh embedded -u "insert_statement".
>>
>> There is already a JIRA [1] that wants to support -f option
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12828
>>
>> Best,
>> Godfrey
>>
>> Jun Zhang  于2020年7月29日周三 上午9:22写道:
>>
>>> I want to execute some flink sql batch jobs regularly, such as 'insert
>>> into
>>> select .', but I can't find a suitable method so far, so reference
>>>  hive, I changed the source code and add a  '--filename'  parameter  so
>>> that we can execute a sql file.
>>>
>>> like this:
>>>
>>> /home/flink/bin/sql-client.sh embedded -f flink.sql
>>>
>>> what about any ideas or plans for this feature community?
>>>
>>


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
hi Jun,

Currently, sql client has supported -u option, just like:
 ./bin/sql-client.sh embedded -u "insert_statement".

There is already a JIRA [1] that wants to support -f option

[1] https://issues.apache.org/jira/browse/FLINK-12828

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午9:22写道:

> I want to execute some flink sql batch jobs regularly, such as 'insert into
> select .', but I can't find a suitable method so far, so reference
>  hive, I changed the source code and add a  '--filename'  parameter  so
> that we can execute a sql file.
>
> like this:
>
> /home/flink/bin/sql-client.sh embedded -f flink.sql
>
> what about any ideas or plans for this feature community?
>


Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-28 Thread godfrey he
这个问题只能说是使用TableEnvironment不当的问题。ververica的gateway的模式其实就是多线程。
创建TableEnvironment和使用TableEnvironment可能不是一个线程,worker线程是被复用的。
简单来说就是:
当session创建的时候,worker thread1 会创建一个TableEnvironment,
然后当后续其他该session请求过来时候,可能是 worker thread2使用该TableEnvironment执行sql。

这个其实就是在多线程情况下使用TableEnvironment。不符合TableEnvironment只能在单线程使用的约束。

wind.fly@outlook.com  于2020年7月28日周二 下午2:09写道:

>
> gateway就类似于一个web服务,大概流程是建立连接时会初始化一个session,在session里面初始化TableEnvironment,然后根据sql类型做不同的操作,比如select语句会去执行sqlQuery,具体可查看
> https://github.com/ververica/flink-sql-gateway。
>
> 另外,加了RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))后确实不报这个错了。
>
> 题外话,个人认为flink不应该将这样的异常抛给用户去解决,除非我去深入研究源码,要不然根本无法搞清楚具体发生了什么,在封装性上还有待改善。
> ________
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 13:55
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 我的怀疑点还是多线程引起的。
> 你能具体描述一下你们gateway的行为吗? 是一个web server?
>
> 另外,你可以在table env执行query前加上
> RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
> 这句话临时fix。
>
> wind.fly@outlook.com  于2020年7月28日周二
> 上午11:02写道:
>
> > 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> > 
> > 发件人: godfrey he 
> > 发送时间: 2020年7月28日 9:58
> > 收件人: user-zh 
> > 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
> >
> > 你们是否在多线程环境下使用 TableEnvironment ?
> > TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
> >
> > godfrey he  于2020年7月28日周二 上午9:55写道:
> >
> > > hi 能给出详细的schema信息吗?
> > >
> > > wind.fly@outlook.com  于2020年7月27日周一
> > > 下午7:02写道:
> > >
> > >> 补充一下,执行的sql如下:
> > >>
> > >> select order_no, order_time from
> > >> x.ods.ods_binlog_test_trip_create_t_order_1
> > >>
> > >> 
> > >> 发件人: wind.fly@outlook.com 
> > >> 发送时间: 2020年7月27日 18:49
> > >> 收件人: user-zh@flink.apache.org 
> > >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> > >>
> > >> Hi,all:
> > >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> > >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> > >> Caused by: java.lang.NullPointerException
> > >>   at java.util.Objects.requireNonNull(Objects.java:203)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> > >>   at
> > >>
> >
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> > >>   at
> org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> > >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> > >>   at
> > >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> > >>   at
> > >>
>

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread godfrey he
我的怀疑点还是多线程引起的。
你能具体描述一下你们gateway的行为吗? 是一个web server?

另外,你可以在table env执行query前加上
RelMetadataQueryBase.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
这句话临时fix。

wind.fly@outlook.com  于2020年7月28日周二 上午11:02写道:

> 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> 
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 9:58
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 你们是否在多线程环境下使用 TableEnvironment ?
> TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
>
> godfrey he  于2020年7月28日周二 上午9:55写道:
>
> > hi 能给出详细的schema信息吗?
> >
> > wind.fly@outlook.com  于2020年7月27日周一
> > 下午7:02写道:
> >
> >> 补充一下,执行的sql如下:
> >>
> >> select order_no, order_time from
> >> x.ods.ods_binlog_test_trip_create_t_order_1
> >>
> >> 
> >> 发件人: wind.fly@outlook.com 
> >> 发送时间: 2020年7月27日 18:49
> >> 收件人: user-zh@flink.apache.org 
> >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> >>
> >> Hi,all:
> >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> >> Caused by: java.lang.NullPointerException
> >>   at java.util.Objects.requireNonNull(Objects.java:203)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> >>   at
> >>
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> >>   at
> >>
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> >>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> >>   at
> >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> >>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
> >>   at
> >>
> org.apache.flink.table.catalog

Re: flink1.11.1使用Table API Hive方言的executSql报错

2020-07-27 Thread godfrey he
你的包是完整的flink-1.11.1的包吗?
例如 check一下 ClusterClientJobClientAdapter 这个类是否继承 CoordinationRequestGateway
?

shimin huang  于2020年7月28日周二 上午11:21写道:

> Hi,all:
>   本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute sql
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.program.PackagedProgram
> .invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.
> 11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:
> 149) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.deployment.application.
> DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.deployment.application.
> DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
> .lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1
> .jar:1.11.1]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
> CompletableFuture.java:1604) [?:1.8.0_242]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
> 511) [?:1.8.0_242]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_242
> ]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
> .access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
> .run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
> .java:1149) [?:1.8.0_242]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:624) [?:1.8.0_242]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
> TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
> 0_242]
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
> .java:62) ~[?:1.8.0_242]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> ... 13 more
> Caused by: java.lang.IllegalArgumentException: Job client must be a
> CoordinationRequestGateway. This is a bug.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:
> 139) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher
> .setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
> at org.apache.flink.streaming.api.operators.collect.
> CollectResultIterator.setJobClient(CollectResultIterator.java:84)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.table.planner.sinks.SelectTableSinkBase
> .setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1
> .jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
> TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
> 0_242]
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
> .java:62) ~[?:1.8.0_242]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
> at 

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread godfrey he
你们是否在多线程环境下使用 TableEnvironment ?
TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。

godfrey he  于2020年7月28日周二 上午9:55写道:

> hi 能给出详细的schema信息吗?
>
> wind.fly@outlook.com  于2020年7月27日周一
> 下午7:02写道:
>
>> 补充一下,执行的sql如下:
>>
>> select order_no, order_time from
>> x.ods.ods_binlog_test_trip_create_t_order_1
>>
>> 
>> 发件人: wind.fly@outlook.com 
>> 发送时间: 2020年7月27日 18:49
>> 收件人: user-zh@flink.apache.org 
>> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>>
>> Hi,all:
>>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
>> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
>> Caused by: java.lang.NullPointerException
>>   at java.util.Objects.requireNonNull(Objects.java:203)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>>   at
>> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>>   at
>> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>>   at
>> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>>   at
>> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>>   at
>> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>>   at
>> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>>   at
>> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>>   at
>> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(Iden

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread godfrey he
hi 能给出详细的schema信息吗?

wind.fly@outlook.com  于2020年7月27日周一 下午7:02写道:

> 补充一下,执行的sql如下:
>
> select order_no, order_time from
> x.ods.ods_binlog_test_trip_create_t_order_1
>
> 
> 发件人: wind.fly@outlook.com 
> 发送时间: 2020年7月27日 18:49
> 收件人: user-zh@flink.apache.org 
> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>
> Hi,all:
>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> Caused by: java.lang.NullPointerException
>   at java.util.Objects.requireNonNull(Objects.java:203)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>   at
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>   at
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>   at org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>   at
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>   at
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>   at
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>   at
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>   at
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>   at
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>   at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>   at
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>   at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>   at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at
> 

Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-23 Thread godfrey he
和hive结合下,filesystem是支持流式读取的,可以参考 [1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_streaming.html#streaming-reading

Leonard Xu  于2020年7月23日周四 下午10:28写道:

> Hi,
>
> Filesystem connector 支持streaming 写入,streaming 读取
> 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> >
>
>
> > 在 2020年7月23日,22:05,Asahi Lee <978466...@qq.com> 写道:
> >
> > 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
> > 还是filesystem只能用于批操作?
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> "user-zh"
>   <
> xbjt...@gmail.com ;
> > 发送时间:2020年7月23日(星期四) 上午9:55
> > 收件人:"user-zh" user-zh@flink.apache.org>;
> >
> > 主题:Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误
> >
> >
> >
> > Hi, Asahi
> >
> > 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复
> >
> >
> > Best
> > Leonard Xu
> > [1] https://issues.apache.org/jira/browse/FLINK-18665 <
> https://issues.apache.org/jira/browse/FLINK-18665> <
> https://issues.apache.org/jira/browse/FLINK-18665 <
> https://issues.apache.org/jira/browse/FLINK-18665>;
> >
> >  在 2020年7月23日,00:07,Asahi Lee <978466...@qq.com  978466...@qq.com> 写道:
> > 
> >  1. 程序
> >  StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >  nbsp; nbsp; nbsp; nbsp; EnvironmentSettings
> bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >  nbsp; nbsp; nbsp; nbsp; StreamTableEnvironment
> bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> > 
> > 
> >  nbsp; nbsp; nbsp; nbsp; String sourceTableDDL =
> "CREATE TABLE fs_table (" +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; user_id STRING," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; order_amount DOUBLE," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; dt TIMESTAMP(3)," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; pt AS PROCTIME() " +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; " ) WITH (" +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; 'connector'='filesystem'," +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; 'path'='D:\\Program
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
> +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; "nbsp; 'format'='csv'" +
> >  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; " )";
> > 
> > 
> >  nbsp; nbsp; nbsp; nbsp;
> bsTableEnv.executeSql(sourceTableDDL);
> >  nbsp; nbsp; nbsp; nbsp;
> bsTableEnv.executeSql("select * from fs_table").print();
> >  2. csv文件
> >  order.csv
> >  zhangsan,12.34,2020-08-03 12:23:50
> >  lisi,234.67,2020-08-03 12:25:50
> >  wangwu,57.6,2020-08-03 12:25:50
> >  zhaoliu,345,2020-08-03 12:28:50
> > 
> > 
> > 
> >  3. 错误
> >  nbsp;- Source: FileSystemTableSource(user_id, order_amount,
> dt, pt) -gt; Calc(select=[user_id, order_amount, dt,
> PROCTIME_MATERIALIZE(()) AS pt]) -gt; SinkConversionToRow (4/6)
> (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
> >  java.io.IOException: Failed to deserialize CSV row.
> >   at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
> >   at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
> >   at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
> >   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> >   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> >   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> >  Caused by: java.lang.RuntimeException: Row length mismatch. 4
> fields expected but was 3.
> >   at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
> >   at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
> >   at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
> >   ... 5 more
>
>


Re: Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-23 Thread godfrey he
1.10 也是支持的

Michael Ran  于2020年7月22日周三 下午9:07写道:

> 1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,2.with
> properties属性很重要 ,关系我自定义的一些参数设定。3.关于  catalog 这个东西,是不是只有1.11
> 版本才能从catalog  获取  with properties 哦? 1.10 you  有支持吗
> 在 2020-07-22 18:22:22,"godfrey he"  写道:
> >tableEnv 中 可以通过
> >tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
> >如果要拿到properties,可以通过catalog的接口得到 [1]。
> >如果要自定义实现source/sink,可以参考 [2]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
> >[2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html
> >
> >Best,
> >Godfrey
> >
> >
> >
> >
> >
> >Michael Ran  于2020年7月22日周三 下午4:10写道:
> >
> >> dear all:
> >>  我用flink 注册一张表:
> >>   CREATE TABLE dim_mysql (
> >> id int,  --
> >> type varchar --
> >> ) WITH (
> >> 'connector' = 'jdbc',
> >> 'url' = 'jdbc:mysql://localhost:3390/test',
> >> 'table-name' = 'flink_test',
> >> 'driver' = 'com.mysql.cj.jdbc.Driver',
> >> 'username' = '',
> >> 'password' = '',
> >> 'lookup.cache.max-rows' = '5000',
> >> 'lookup.cache.ttl' = '1s',
> >> 'lookup.max-retries' = '3'
> >> )
> >> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
> >> 以及属性,map 这种。
> >> 我看阿里官方有blink 支持自定义sink:
> >> publicabstractclassCustomSinkBaseimplementsSerializable{
> >> protectedMap userParamsMap;// 您在sql
> with语句中定义的键值对,但所有的键均为小写
> >> protectedSet primaryKeys;// 您定义的主键字段名
> >> protectedList headerFields;// 标记为header的字段列表
> >> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
> >> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑
>


Re: flink 1.11 ddl 写mysql的问题

2020-07-23 Thread godfrey he
你观察到有sink写不过来导致反压吗?
或者你调大flush interval试试,让每个buffer攒更多的数据

曹武 <14701319...@163.com> 于2020年7月23日周四 下午4:48写道:

> 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> 代码如下:
> String sourceDdl =" CREATE TABLE debezium_source " +
> "( " +
> "id STRING NOT NULL, name STRING, description STRING,
> weight
> Double" +
> ") " +
> "WITH (" +
> " 'connector' = 'kafka-0.11'," +
> " 'topic' = 'test0717'," +
> " 'properties.bootstrap.servers' = ' 172.22.20.206:9092',
> "
> +
> "'scan.startup.mode' =
> 'group-offsets','properties.group.id'='test'," +
> "'format' = 'debezium-json'," +
> "'debezium-json.schema-include'='false'," +
> "'debezium-json.ignore-parse-errors'='true')";
> tEnv.executeSql(sourceDdl);
> System.out.println("init source ddl successful ==>" + sourceDdl);
> String sinkDdl = " CREATE TABLE sink " +
> "( " +
> "id STRING NOT NULL," +
> " name STRING, " +
> "description STRING," +
> " weight Double," +
> " PRIMARY KEY (id) NOT ENFORCED " +
> ")" +
> " WITH " +
> "( " +
> "'connector' = 'jdbc', " +
> "'url' =
> 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
> "'table-name' = 'table-out', " +
> "'driver'= 'com.mysql.cj.jdbc.Driver'," +
> "'sink.buffer-flush.interval'='1s'," +
> "'sink.buffer-flush.max-rows'='1000'," +
> "'username'='DataPip', " +
> "'password'='DataPip')";
> tEnv.executeSql(sinkDdl);
> System.out.println("init sink ddl successful ==>" + sinkDdl);
>
>  String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight FROM debezium_source";
> System.out.println("execute dml  ==>" + dml);
> tEnv.executeSql(dml);
> tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> 'print')" +
> "LIKE debezium_source (EXCLUDING ALL)");
> tEnv.executeSql("INSERT INTO print_table SELECT  id,name
> ,description,  weight FROM debezium_source");
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 Thread godfrey he
hi,
目前没有解决办法,insert job根据sink表名自动生成job name。
后续解法关注 https://issues.apache.org/jira/browse/FLINK-18545

Weixubin <18925434...@163.com> 于2020年7月23日周四 下午6:07写道:

> Hi,
> 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
> 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
> 请问有什么解决的方法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 16:07:17,"Jingsong Li"  写道:
> >Hi,
> >
> >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
> >
> >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
> >并没有真正的物理节点。你不用再调用了。
> >
> >Best,
> >Jingsong
> >
> >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:
> >
> >>
> >>
> >>
> >> 代码结构改成这样的了:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>
> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>
> >>
> >> streamExecutionEnv.execute("from kafka sink hbase")
> >>
> >>
> >>
> >>
> >> 还是报一样的错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-08 15:40:41,"夏帅"  写道:
> >> >你好,
> >> >可以看看你的代码结构是不是以下这种
> >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >> >val bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >> >  ..
> >> >tableEnv.execute("")
> >> >如果是的话,可以尝试使用bsEnv.execute("")
> >> >1.11对于两者的execute代码实现有改动
> >> >
> >> >
> >> >--
> >> >发件人:Zhou Zach 
> >> >发送时间:2020年7月8日(星期三) 15:30
> >> >收件人:Flink user-zh mailing list 
> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming
> topology
> >> >
> >> >代码在flink
> >>
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >> >Exception in thread "main" java.lang.IllegalStateException: No
> operators
> >> defined in streaming topology. Cannot generate StreamGraph.
> >> >at
> >>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >> >
> >> >
> >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >> >
> >> >
> >> >
> >> >
> >> >query:
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE `user` (
> >> >|uid BIGINT,
> >> >|sex VARCHAR,
> >> >|age INT,
> >> >|created_time TIMESTAMP(3),
> >> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >|) WITH (
> >> >|'connector.type' = 'kafka',
> >> >|'connector.version' = 'universal',
> >> >|-- 'connector.topic' = 'user',
> >> >|'connector.topic' = 'user_long',
> >> >|'connector.startup-mode' = 'latest-offset',
> >> >|'connector.properties.group.id' = 'user_flink',
> >> >|'format.type' = 'json',
> >> >|'format.derive-schema' = 'true'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE user_hbase3(
> >> >|rowkey BIGINT,
> >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >> >|) WITH (
> >> >|'connector.type' = 'hbase',
> >> >|'connector.version' = '2.1.0',
> >> >|'connector.table-name' = 'user_hbase2',
> >> >|'connector.zookeeper.znode.parent' = '/hbase',
> >> >|'connector.write.buffer-flush.max-size' = '10mb',
> >> >|'connector.write.buffer-flush.max-rows' = '1000',
> >> >|'connector.write.buffer-flush.interval' = '2s'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|insert into user_hbase3
> >> >|SELECT uid,
> >> >|
> >> >|  ROW(sex, age, created_time ) as cf
> >> >|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as
> >> created_time from `user`)
> >> >|
> >> >|""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


Re: flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 Thread godfrey he
client端会不断的pull sink产生的数据,但是只有等checkpoint完成后,其对应的数据才能 collect() 和 print()
返回。
这是为了保证exactly once语义。
在1.12里,同时支持了at least once 和 exactly once 语义。默认情况下是 at least once,collect()
和 print() 的结果可能有重复。
如果有兴趣可以参考pr:https://github.com/apache/flink/pull/12867
<https://github.com/apache/flink/pull/12867#event-3578490750>

Best,
Godfrey

wind.fly@outlook.com  于2020年7月23日周四 下午7:34写道:

> Hi,Godfrey:
>  加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档?
>
> Best,
> Junbao Zhang
> ________
> 发件人: godfrey he 
> 发送时间: 2020年7月23日 19:24
> 收件人: user-zh 
> 主题: Re: flink 1.11 executeSql查询kafka表print没有输出
>
> 1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
> 都是exactly once语义,需要配置checkpoint才能得到结果。
>
> Best,
> Godfrey
>
> wind.fly@outlook.com  于2020年7月23日周四
> 下午7:22写道:
>
> > Hi, all:
> >
> >
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> > sql如下:
> >
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> >
> > Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> > tEnv.registerCatalog("x", catalog);
> >
> > TableResult execute = tEnv.executeSql("select * from
> > x.ods.ods_binlog_test_trip_create_t_order_1");
> >
> > execute.print();
> >
> > 建表语句如下:
> >
> > CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
> >   _type STRING,
> >   order_no STRING,
> >   order_time STRING,
> >   dt as TO_TIMESTAMP(order_time),
> >   proctime as PROCTIME(),
> >   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.properties.bootstrap.servers' = '***',
> >   'connector.properties.zookeeper.connect' = '',
> >   'connector.version' = 'universal',
> >   'format.type' = 'json',
> >   'connector.properties.group.id' = 'testGroup',
> >   'connector.startup-mode' = 'group-offsets',
> >   'connector.topic' = 'test'
> > )
> >
>


Re: flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 Thread godfrey he
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
都是exactly once语义,需要配置checkpoint才能得到结果。

Best,
Godfrey

wind.fly@outlook.com  于2020年7月23日周四 下午7:22写道:

> Hi, all:
>
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> sql如下:
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
> Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", catalog);
>
> TableResult execute = tEnv.executeSql("select * from
> x.ods.ods_binlog_test_trip_create_t_order_1");
>
> execute.print();
>
> 建表语句如下:
>
> CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
>   _type STRING,
>   order_no STRING,
>   order_time STRING,
>   dt as TO_TIMESTAMP(order_time),
>   proctime as PROCTIME(),
>   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.properties.bootstrap.servers' = '***',
>   'connector.properties.zookeeper.connect' = '',
>   'connector.version' = 'universal',
>   'format.type' = 'json',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'group-offsets',
>   'connector.topic' = 'test'
> )
>


Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 Thread godfrey he
这个问题的已经有一个issue:https://issues.apache.org/jira/browse/FLINK-18545,请关注

WeiXubin <18925434...@163.com> 于2020年7月23日周四 下午6:00写道:

> Hi,
> 我想请问下使用 streamExecutionEnv.execute("from kafka sink
> hbase"),通过这种方式可以给Job指定名称。
> 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。
> 请问有什么解决方案吗?谢谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-22 Thread godfrey he
tableEnv 中 可以通过
tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
如果要拿到properties,可以通过catalog的接口得到 [1]。
如果要自定义实现source/sink,可以参考 [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html

Best,
Godfrey





Michael Ran  于2020年7月22日周三 下午4:10写道:

> dear all:
>  我用flink 注册一张表:
>   CREATE TABLE dim_mysql (
> id int,  --
> type varchar --
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3390/test',
> 'table-name' = 'flink_test',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = '',
> 'password' = '',
> 'lookup.cache.max-rows' = '5000',
> 'lookup.cache.ttl' = '1s',
> 'lookup.max-retries' = '3'
> )
> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
> 以及属性,map 这种。
> 我看阿里官方有blink 支持自定义sink:
> publicabstractclassCustomSinkBaseimplementsSerializable{
> protectedMap userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
> protectedSet primaryKeys;// 您定义的主键字段名
> protectedList headerFields;// 标记为header的字段列表
> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 Thread godfrey he
Hi,首维

感谢给出非常详细的反馈。这个问题我们之前内部也有一些讨论,但由于缺乏一些真实场景,最后维持了当前的接口。
我们会根据你提供的场景进行后续讨论。

Best,
Godfrey

刘首维  于2020年7月22日周三 下午5:23写道:

> Hi, Jark
>
>
>
>感谢你的建议!
>
>我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。
>
>先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法
>
>```
>
>  >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter
> 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
>   ```
>
>  
> 比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
> 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的
>
> 考虑到Flink Task都可以拆分成Source -> Transformation -> sink
> 三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~
>
>诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink
> API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,
>
> 可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地
>
>
> 再次感谢各位的回复!
>
> 
> 发件人: Jark Wu 
> 发送时间: 2020年7月22日 16:33:45
> 收件人: user-zh
> 抄送: godfrey he; greemqq...@163.com; 刘首维
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> Hi,首维,
>
> 非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
> 因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。
>
> 关于你的一些需求,下面是我的建议和回复:
>
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> 这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema
> 支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。
>
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> 这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
>
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> 这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by
> partition。我感觉这个可能也可以通过引入类似的接口解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 16:27, Leonard Xu  xbjt...@gmail.com>> wrote:
> Hi,首维, Ran
>
> 感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净,
> 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
> 我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
>
> 祝好
> Leonard Xu
>
>
> > 在 2020年7月22日,13:47,刘首维  liushou...@autohome.com.cn>> 写道:
> >
> > Hi JingSong,
> >
> >
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
> >  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> >
> >
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> >
> >
> > 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> >
> >
> > 
> > 发件人: Jingsong Li mailto:jingsongl...@gmail.com>>
> > 发送时间: 2020年7月22日 13:26:00
> > 收件人: user-zh
> > 抄送: imj...@gmail.com<mailto:imj...@gmail.com>
> > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> >
> > 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> >
> > Best
> > Jingsong
> >
> > On Wed, Jul 22, 2020 at 12:36 PM 刘首维  liushou...@autohome.com.cn>> wrote:
> >
> >> Hi all,
> >>
> >>
> >>
> >>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >>
> >>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >>
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >>
> >>
> >>
> >>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>


Re: Flink catalog的几个疑问

2020-07-21 Thread godfrey he
hi Xingxing,

1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
postgres catalog,
可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
catalog写新的meta。
是否会转为默认catalog,据我所知,目前没有。
3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。

Best,
Godfrey

dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:

> Hi Flink社区:
> 有几个疑问希望社区小伙伴们帮忙解答一下:
>
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
>
>
>
>
> Best,
> Xingxing Di
>


Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 Thread godfrey he
sql-client.sh的-u是指update语句,目前只支持insert。

Jark Wu  于2020年7月21日周二 下午6:47写道:

> Hi,
>
> 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
> 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
> 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.
>
> Best,
> Jark
>
> On Thu, 16 Jul 2020 at 19:43, Harold.Miao  wrote:
>
> > hi flink users
> >
> > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
> > 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


Re: flink1.11 tablefunction

2020-07-21 Thread godfrey he
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide


Dream-底限  于2020年7月21日周二 下午7:25写道:

> hi
>
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
>


Re: flink table同时作为写出及输入时下游无数据

2020-07-21 Thread godfrey he
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

sql1='''Insert into g_sink_unit select alarm_id,trck_id from
kafka_source_tab'''
sql2='''Insert into g_summary_base select alarm_id,trck_id from
kafka_source_tab;'''

小学生 <201782...@qq.com> 于2020年7月21日周二 下午5:47写道:

>
> 各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。
>
>
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env,
> environment_settings=env_settings)
>
>
>
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tab (
> id VARCHAR, 
> alarm_id VARCHAR, 
> trck_id VARCHAR
>
>
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'gg', 
> 'scan.startup.mode' = 'specific-offsets',
> 'scan.startup.specific-offsets'='partition:1,offset:0',
> 'properties.bootstrap.servers' = '',
> 'format' = 'json'
> )
> """
> g_unit_sink_ddl = """
> CREATE TABLE g_sink_unit (
> alarm_id VARCHAR, 
> trck_id VARCHAR
> 
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
> 'table-name' = 'g_unit', 
> 'username' = 'root',
> 'password' = 'root',
> 'sink.buffer-flush.interval' = '1s'  
> )
> """
> g_summary_ddl = """
> CREATE TABLE g_summary_base(
> alarm_id VARCHAR, 
> trck_id VARCHAR
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
> 'table-name' = 'g_summary',
> 'username' = 'root',
> 'password' = 'root',
> 'sink.buffer-flush.interval' = '1s'
> )
> """
>
> t_env.execute_sql(kafka_source_ddl)
> t_env.execute_sql(g_unit_sink_ddl)
> t_env.execute_sql(g_summary_ddl)
>
>
> sql1='''Insert into g_sink_unitselect alarm_id,trck_id from
> kafka_source_tab'''
> sql2='''Insert into g_summary_baseselect alarm_id,trck_id from
> g_sink_unit'''
>
>
>
> stmt_set = t_env.create_statement_set()
> stmt_set.add_insert_sql(sql1)
> stmt_set.add_insert_sql(sql2)
>
>
> stmt_set.execute().get_job_client().get_job_execution_result().result()


Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-21 Thread godfrey he
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
 这个邮件里提到了类似的问题。

https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
“data”,“mysqlType”等格式不确定的字段定义为String类型,
下游通过udf自己再解析对应的json


Best,
Godfrey

jindy_liu <286729...@qq.com> 于2020年7月21日周二 下午12:37写道:

> 例如:
>
> mysql表:
> CREATE TABLE `test` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   `time` datetime NOT NULL,
>   `status` int(11) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> CREATE TABLE `status` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> kafka中数据:
> // 表test 中insert事件
> {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
>
> 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
>
> //表status 中的事件
>
> {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
>
> 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
> 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
JIRA: https://issues.apache.org/jira/browse/FLINK-18651

godfrey he  于2020年7月21日周二 上午9:46写道:

> hi  Kelly,
> As the exception message mentioned: currently, we must cast the time
> attribute to regular TIMESTAMP type,
> then we can do regular join. Because time attribute will be out-of-order
> after regular join,
> and then we can't do window aggregate based on the time attribute.
>
> We can improve it that the planner implicitly casts the time attribute to
> regular TIMESTAMP type,
> and throws exception there is an operator (after join) depended on time
> attribute, like window aggregate.
>
> I will create a JIRA to trace this.
>
> Best,
> Godfrey
>
> Kelly Smith  于2020年7月21日周二 上午6:38写道:
>
>> Hi folks,
>>
>>
>>
>> I have a question Flink SQL. What I want to do is this:
>>
>>
>>
>>- Join a simple lookup table (a few rows) to a stream of data to
>>enrich the stream by adding a column from the lookup table.
>>
>>
>>
>>
>>
>> For example, a simple lookup table:
>>
>>
>>
>> *CREATE TABLE *LookupTable (
>> *`computeClass`  *STRING,
>> *`multiplier`*
>> *FLOAT *) *WITH *(
>> *'connector' *= *'filesystem'*,
>> *'path' *= *'fpu-multipliers.csv'*,
>> *'format' *=
>> *'csv' *)
>>
>>
>>
>>
>>
>> And I’ve got a Kafka connector table with rowtime semantics that has a
>> `computeClass` field. I simply want to join (in a streaming fashion) the
>> `multiplier` field above.
>>
>>
>>
>>
>> *SELECT*`timestamp`,
>>
>> // ...
>> ks.computeClass,
>> lt.`multiplier`
>> *FROM *KafkaStream ks
>>
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>
>>
>>
>> Doing a simple join like that gives me this error:
>>
>>
>>
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not
>> be in the input rows of a regular join. As a workaround you can cast the
>> time attributes of input tables to TIMESTAMP before.”
>>
>>
>>
>> Which leads me to believe that I should use an Interval Join instead, but
>> that doesn’t seem to be appropriate since my table is static and has no
>> concept of time. Basically, I want to hold the entire lookup table in
>> memory, and simply enrich the Kafka stream (which need not be held in
>> memory).
>>
>>
>>
>> Any ideas on how to accomplish what I’m trying to do?
>>
>>
>>
>> Thanks!
>>
>> Kelly
>>
>


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
hi  Kelly,
As the exception message mentioned: currently, we must cast the time
attribute to regular TIMESTAMP type,
then we can do regular join. Because time attribute will be out-of-order
after regular join,
and then we can't do window aggregate based on the time attribute.

We can improve it that the planner implicitly casts the time attribute to
regular TIMESTAMP type,
and throws exception there is an operator (after join) depended on time
attribute, like window aggregate.

I will create a JIRA to trace this.

Best,
Godfrey

Kelly Smith  于2020年7月21日周二 上午6:38写道:

> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE *LookupTable (
> *`computeClass`  *STRING,
> *`multiplier`*
> *FLOAT *) *WITH *(
> *'connector' *= *'filesystem'*,
> *'path' *= *'fpu-multipliers.csv'*,
> *'format' *=
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>
>
>
> Doing a simple join like that gives me this error:
>
>
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.”
>
>
>
> Which leads me to believe that I should use an Interval Join instead, but
> that doesn’t seem to be appropriate since my table is static and has no
> concept of time. Basically, I want to hold the entire lookup table in
> memory, and simply enrich the Kafka stream (which need not be held in
> memory).
>
>
>
> Any ideas on how to accomplish what I’m trying to do?
>
>
>
> Thanks!
>
> Kelly
>


Re: SQL 报错只有 flink runtime 的 NPE

2020-07-20 Thread godfrey he
看不到图片信息,换一个图床工具上传图片吧

Luan Cooper  于2020年7月17日周五 下午4:11写道:

> 附一个 Job Graph 信息,在 Cal 处挂了
> [image: image.png]
>
> On Fri, Jul 17, 2020 at 4:01 PM Luan Cooper  wrote:
>
>> 实际有 20 左右个字段,用到的 UDF 有 COALESCE / CAST / JSON_PATH / TIMESTAMP 类
>> *是指 UDF 返回了 NULL 导致的吗?*
>>
>>
>> On Fri, Jul 17, 2020 at 2:54 PM godfrey he  wrote:
>>
>>> udf_xxx的逻辑是啥?
>>>
>>>
>>> Luan Cooper  于2020年7月17日周五 下午2:40写道:
>>>
>>> > Hi
>>> >
>>> > 我有这么一个 SQL
>>> > INSERT INTO es
>>> > SELECT
>>> > a,
>>> > udf_xxx(b)
>>> > FROM mongo_oplog -- 自定义 TableFactory
>>> >
>>> > Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码
>>> Exception,可以稳定重现
>>> >
>>> > LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
>>> > (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
>>> >
>>> > java.lang.NullPointerException
>>> >
>>> > at StreamExecCalc$8016.split$7938$(Unknown Source)
>>> >
>>> > at StreamExecCalc$8016.processElement(Unknown Source)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>> >
>>> > at java.lang.Thread.run(Thread.java:748)
>>> >
>>> > 请问这种怎样情况排查问题?
>>> > 有任何线索都可以
>>> >
>>> > 感谢
>>> >
>>>
>>


Re: flink sink到kafka

2020-07-19 Thread godfrey he
如果你是用flink sql的,可以通过DDL的方式来定义kafka sink,参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
Godfrey

smq <374060...@qq.com> 于2020年7月19日周日 下午9:36写道:

> 大家好,我想通过avro格式sink到kafka,请问该怎么实现,官网上没找到相关方法。


Re: Flink 1.11 Sql client environment yaml

2020-07-18 Thread godfrey he
hi

GenericInMemoryCatalog does not support settings now,
or you can refer to [1] for supported catalog details
and you can refer to [2] to supported types details.

"Kafka schema registry for schema" is under discussion [3],
which can be ready in 1.12.

sql client supports DDL to create a table with json format [4],
you can use ROW type to define nested json.
for example:

create table my_table (
  f varchar,
  nest_column row<
a varchar,
b int,
c int
  >
) with (
...
)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html#catalogs
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html
[3] https://issues.apache.org/jira/browse/FLINK-16048
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#how-to-create-a-table-with-json-format

Best,
Godfrey


Lian Jiang  于2020年7月18日周六 上午6:28写道:

> Hi,
>
> I am experimenting Flink SQL by following
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html.
> I want to set up an environment yaml to query Kafka data (json in avro
> format). Where can I find the information below?
>
> 1. use GenericInMemoryCatalog (e.g. type, settings)
> 2. use Kafka schema registry for schema. The example hard code the schema
> in env yaml.
> 3. other than UDF, is there a way to easily query a deeply nested json in
> Flink SQL?
>
> Appreciate your help!
>
> Regards
> Lian
>
>
>
>
>


Re: FlinkSQL 任务提交后 任务名称问题

2020-07-18 Thread godfrey he
hi Evan,
感谢反馈,目前已经有一个issue [1]在跟踪该问题,可以关注后续进展

[1] https://issues.apache.org/jira/browse/FLINK-18545


Best,
Godfrey

Jeff Zhang  于2020年7月18日周六 下午9:52写道:

> 在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)
>
> %flink.ssql(jobName="my job")
>
> insert into sink_kafka select status, direction, cast(event_ts/10
> as timestamp(3)) from source_kafka where status <> 'foo'
>
> [image: image.png]
>
> Evan  于2020年7月18日周六 下午5:47写道:
>
>> 代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into
>> esSinkTable select ... from kafkaSourceTable")执行
>> 任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”
>>
>>
>> 这样很不友好啊,能不能我自己指定任务名称呢?
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Re: Re:Re: flink 1.11任务提交的问题

2020-07-17 Thread godfrey he
是的。目前按照你的写法做不到只提交一个job了

sunfulin  于2020年7月17日周五 下午3:11写道:

>
>
>
> hi,
> 再问下,这个方案还是会提交两个job吧?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 14:36:19,"godfrey he"  写道:
> >做不到,1.11里把 StreamExecutionEnvironment.execute 和
> >StreamTableEnvironment.execute 的逻辑已经切分干净了。
> >有个改动比较小的方案可以参考:可以在原来的逻辑的基础上,把两种提交job的方式放到两个不同的类中,其他的逻辑放到另外一个类共性。
> >
> >sunfulin  于2020年7月17日周五 下午2:00写道:
> >
> >> hi,
> >> 补充一下,1.10版本的代码使用sqlUpdate +
> >>
> table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-17 13:55:21,"sunfulin"  写道:
> >>
> >>
> >>
> >>
> >> hi,
> >>
> 感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
> >> to DataStream的语句不会生成拓扑。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-17 12:09:20,"godfrey he"  写道:
> >> >hi sunfulin,
> >> >目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
> >> >即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
> >> >虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
> >> >
> >> >Best,
> >> >Godfrey
> >> >
> >> >Leonard Xu  于2020年7月17日周五 上午12:12写道:
> >> >
> >> >> Hi,
> >> >>
> >> >> 我理解目前好像做不到, cc: godfrey 大佬看看
> >> >>
> >> >> 祝好,
> >> >> Leonard Xu
> >> >>
> >> >> > 在 2020年7月16日,23:08,sunfulin  写道:
> >> >> >
> >> >> > hi,
> >> >> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> >> >> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> >> >> > 通过StreamExecutionEnvironment.execute提交,yarn
> >> >> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
> >> >>
> >> >>
> >>
> >>
> >>
> >>
> >>
> >>
>


Re: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function

2020-07-17 Thread godfrey he
第二个问题的异常栈是啥?

kcz <573693...@qq.com> 于2020年7月17日周五 下午2:17写道:

> 第一个bug提示只需要
> classloader.resolve-order: parent-first
> 第二个bug采用了parquet还没解决
>
>
> --原始邮件--
> 发件人:
>   "kcz"
> <
> 573693...@qq.com;
> 发送时间:2020年7月17日(星期五) 中午1:32
> 收件人:"user-zh"
> 主题:flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function
>
>
>
> standalone
> lib jar包如下
> flink-connector-hive_2.11-1.11.0.jar   
> flink-json-1.11.0.jar  
> 
> flink-sql-connector-kafka_2.12-1.11.0.jar log4j-api-2.12.1.jar
> flink-csv-1.11.0.jar  
>  flink-parquet_2.11-1.11.0.jar
>   
> flink-table_2.11-1.11.0.jar  
>  log4j-core-2.12.1.jar
> flink-dist_2.11-1.11.0.jar  
>   flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar
> flink-table-blink_2.11-1.11.0.jar
> log4j-slf4j-impl-2.12.1.jar
> flink-hadoop-compatibility_2.11-1.11.0.jar
> flink-shaded-zookeeper-3.4.14.jar 
> log4j-1.2-api-2.12.1.jar
>
>
>
>
>
> 代码如下:idea下不报错
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> env.setParallelism(1);
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> // 同一时间只允许进行一个检查点
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
> env.setStateBackend(new FsStateBackend(path));
>
> tableEnv.executeSql("CREATE TABLE source_table (\n" +
> "\thost STRING,\n" +
> "\turl STRING,\n" +
> "\tpublic_date STRING\n" +
> ") WITH (\n" +
> "\t'connector.type' = 'kafka',\n" +
> "\t'connector.version' = 'universal',\n" +
> "\t'connector.startup-mode' = 'latest-offset',\n" +
> "\t'connector.topic' = 'test_flink_1.11',\n" +
> "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
> "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n"
> +
> "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n"
> +
> "\t'update-mode' = 'append',\n" +
> "\t'format.type' = 'json',\n" +
> "\t'format.derive-schema' = 'true'\n" +
> ")");
>
> tableEnv.executeSql("CREATE TABLE fs_table (\n" +
> "  host STRING,\n" +
> "  url STRING,\n" +
> "  public_date STRING\n" +
> ") PARTITIONED BY (public_date) WITH (\n" +
> "  'connector'='filesystem',\n" +
> "  'path'='path',\n" +
> "  'format'='json',\n" +
> "  'sink.partition-commit.delay'='0s',\n" +
> "  'sink.partition-commit.policy.kind'='success-file'\n" +
> ")");
>
> tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url,
> DATE_FORMAT(public_date, '-MM-dd') FROM source_table");
> TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
> result.print();
> 报错如下
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
>  at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.commons.collections.map.LinkedMap to field
>
>
> 第二个bug
> sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK


Re: SQL 报错只有 flink runtime 的 NPE

2020-07-17 Thread godfrey he
udf_xxx的逻辑是啥?


Luan Cooper  于2020年7月17日周五 下午2:40写道:

> Hi
>
> 我有这么一个 SQL
> INSERT INTO es
> SELECT
> a,
> udf_xxx(b)
> FROM mongo_oplog -- 自定义 TableFactory
>
> Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现
>
> LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
> (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
>
> java.lang.NullPointerException
>
> at StreamExecCalc$8016.split$7938$(Unknown Source)
>
> at StreamExecCalc$8016.processElement(Unknown Source)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> 请问这种怎样情况排查问题?
> 有任何线索都可以
>
> 感谢
>


Re: Re:Re: flink 1.11任务提交的问题

2020-07-17 Thread godfrey he
做不到,1.11里把 StreamExecutionEnvironment.execute 和
StreamTableEnvironment.execute 的逻辑已经切分干净了。
有个改动比较小的方案可以参考:可以在原来的逻辑的基础上,把两种提交job的方式放到两个不同的类中,其他的逻辑放到另外一个类共性。

sunfulin  于2020年7月17日周五 下午2:00写道:

> hi,
> 补充一下,1.10版本的代码使用sqlUpdate +
> table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 13:55:21,"sunfulin"  写道:
>
>
>
>
> hi,
> 感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
> to DataStream的语句不会生成拓扑。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 12:09:20,"godfrey he"  写道:
> >hi sunfulin,
> >目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
> >即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
> >虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
> >
> >Best,
> >Godfrey
> >
> >Leonard Xu  于2020年7月17日周五 上午12:12写道:
> >
> >> Hi,
> >>
> >> 我理解目前好像做不到, cc: godfrey 大佬看看
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> > 在 2020年7月16日,23:08,sunfulin  写道:
> >> >
> >> > hi,
> >> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> >> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> >> > 通过StreamExecutionEnvironment.execute提交,yarn
> >> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
> >>
> >>
>
>
>
>
>
>


Re: flink 1.11任务提交的问题

2020-07-16 Thread godfrey he
hi sunfulin,
目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。

Best,
Godfrey

Leonard Xu  于2020年7月17日周五 上午12:12写道:

> Hi,
>
> 我理解目前好像做不到, cc: godfrey 大佬看看
>
> 祝好,
> Leonard Xu
>
> > 在 2020年7月16日,23:08,sunfulin  写道:
> >
> > hi,
> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> > 通过StreamExecutionEnvironment.execute提交,yarn
> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>
>


Re: flink 1.11 checkpoint使用

2020-07-16 Thread godfrey he
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
debezium_source" 不能满足需求?

曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:

> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> 从checkpoint恢复以后,新来op=d的数据会删除失败
> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> 最大允许同时出现几个CheckPoint
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> 最小得间隔时间
> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
> 是否倾向于用CheckPoint做故障恢复
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> //
> 容忍多少次CheckPoint失败
> //Checkpoint文件清理策略
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> //Checkpoint外部文件路径
> env.setStateBackend(new FsStateBackend(new
> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> String sourceDDL = String.format(
> "CREATE TABLE debezium_source (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double" +
> ") WITH (" +
> " 'connector' = 'kafka-0.11'," +
> " 'topic' = '%s'," +
> " 'properties.bootstrap.servers' = '%s'," +
> " 'scan.startup.mode' = 'group-offsets'," +
> " 'format' = 'debezium-json'" +
> ")", "ddd", " 172.22.20.206:9092");
> String sinkDDL = "CREATE TABLE sink (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double," +
> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED "
> +
> ") WITH (" +
> " 'connector' = 'jdbc'," +
> " 'url' =
> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> " 'table-name' = 'products'," +
> " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> " 'username'='DataPip'," +
> " 'password'='DataPip'" +
> ")";
> String dml = "INSERT INTO sink SELECT  id,name ,description, weight
> FROM debezium_source GROUP BY id,name ,description, weight";
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> tEnv.executeSql(dml);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread godfrey he
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

Best,
Godfrey

wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:

> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 Thread godfrey he
现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
你可以配置在flink-conf.yaml里

Harold.Miao  于2020年7月16日周四 下午1:27写道:

> hi flink users
>
> 通过sql-client提交sql怎么设置checkpointing.interval?
> 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> 谢谢
>
>
>
> --
>
> Best Regards,
> Harold Miao
>


Re: Flink-1.11内置connector测试问题求解

2020-07-15 Thread godfrey he
目前 1.11 版本中的 tableResult.print 只支持 exactly once 语义,需要配置 checkpoint。

1.12 里准备支持 at least once 语义,用户可以不用配置 checkpoint。目前 pr [1] 正在reivew 。

[1] https://github.com/apache/flink/pull/12867

Best,
Godfrey

Jingsong Li  于2020年7月16日周四 上午11:36写道:

>  tableResult.print需要有checkpoint
>
> Best,
> Jingsong
>
> On Thu, Jul 16, 2020 at 11:31 AM amen...@163.com  wrote:
>
> > hi, everyone
> >
> > 小白在测试flink
> >
> 1.11新特性新内置的三个connector时,在本地创建图片[1]中的任务并进行数据打印时,控制台只打印了表schema,而没有按内置的datagen
> > connector规则产生数据,请问可能是什么原因呢?谢谢解答!
> >
> >
> > [1] https://postimg.cc/PprT9XV6
> >
> > best,
> > amenhub
> >
> >
> >
> > amen...@163.com
> >
>
>
> --
> Best, Jingsong Lee
>


Re: Parquet format in Flink 1.11

2020-07-15 Thread godfrey he
hi Flavio,

Parquet format supports configuration from ParquetOutputFormat
.
please
refer to [1] for details

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#format-options

Best,
Godfrey



Flavio Pompermaier  于2020年7月15日周三 下午8:44写道:

> Hi to all,
> in my current code I use the legacy Hadoop Output format to write my
> Parquet files.
> I wanted to use the new Parquet format of Flink 1.11 but I can't find how
> to migrate the following properties:
>
> ParquetOutputFormat.setBlockSize(job, parquetBlockSize);
> ParquetOutputFormat.setEnableDictionary(job, true);
> ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>
> Is there a way to set those configs?
> And if not, is there a way to handle them without modifying the source of
> the flink connector (i.e. extending some class)?
>
> Best,
> Flavio
>


Re: flink 1.11运算结果存mysql出错

2020-07-13 Thread godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

TableEnvironment.executeSql() 和 StatementSet.execute()
提交的作业都是异步的,如果是在本地测试的话,不会等有最终结果才会推出。针对这个问题,1.12里准备引入 await 方法
[3],代码还在review中。

TableResult是用来描述一个statement执行的结果。对于SELECT和INSERT,TableResult中还包含了JobClient
[4]
用来操作对应的job,例如获取job状态,cancel作业,等待作业结束等。TableResult还可以collect方法拿到statement执行的schema和结果数据,例如
select/show的结果。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
[3] https://issues.apache.org/jira/browse/FLINK-18337
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey


小学生 <201782...@qq.com> 于2020年7月13日周一 下午9:12写道:

> 嗯嗯,尝试了,这下没问题了,想问下这个TableResult对象,设计的目的是啥呢,不是特别懂呢,谢谢!
>


Re: Table API jobs migration to Flink 1.11

2020-07-12 Thread godfrey he
hi Flavio,

`BatchTableSource` can only be used for old planner.
if you want to use Blink planner to run batch job,
your table source should implement `StreamTableSource`
and `isBounded` method return true.

Best,
Godfrey



Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource myTableSource = new BatchTableSource() {
>   @Override
>   public TableSchema getTableSchema() {
> return new TableSchema(dsFields, ft);
>   }
>   @Override
>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
> return execEnv.createInput(myInputformat);
>   }
> };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier 
> wrote:
>
>> How can you reuse InputFormat to write a TableSource? I think that at
>> least initially this could be the simplest way to test the migration..then
>> I could try yo implement the new Table Source interface
>>
>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:
>>
>>> hi Flavio,
>>> Only old planner supports BatchTableEnvironment (which can convert
>>> to/from DataSet),
>>> while Blink planner in batch mode only support TableEnvironment. Because
>>> Blink planner
>>> convert the batch queries to Transformation (corresponding to
>>> DataStream), instead of DataSet.
>>>
>>> one approach is you can migrate them to TableSource instead (InputFormat
>>> can be reused),
>>> but TableSource will be deprecated later. you can try new table source[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>>>
>>>> Thanks but I still can't understand how to migrate my legacy code. The
>>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>>> call createInput.
>>>>
>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>> TableSource instead?
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>> ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>> BatchTableEnvironment btEnv =
>>>> TableEnvironment.getTableEnvironment(env);
>>>> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>> ft).finish();
>>>> DataSet rows = env.createInput(myInputformat);
>>>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>>>> 1, WriteMode.OVERWRITE);
>>>> btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>> btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>> env.execute();
>>>>   }
>>>>
>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>> dwysakow...@apache.org> wrote:
>>>>
>>>>> You should be good with using the TableEnvironment. The
>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>> DataStream. We do not support converting batch Table programs to
>>>>> DataStream yet.
>>>>>
>>>>> A following code should work:
>>>>>
>>>>> EnvironmentSettings settings =
>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>
>>>>> TableEnvironment.create(settings);
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>> > Hi to all,
>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>> >
>>>>> > EnvironmentSettings settings =
>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>> >
>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>> >
>>>>> > if (!settings.isStreamingMode()) {
>>>>> > throw new TableException(
>>>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>>>> > TableEnvironment.");
>>>>> > }
>>>>> >
>>>>> > What should I do here?
>>>>> >
>>>>> > Thanks in advance,
>>>>> > Flavio
>>>>>
>>>>>
>>>>


Re: flink 1.11 sql作业提交JM报错

2020-07-12 Thread godfrey he
hi sunfulin,

1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]



对于 “No operators defined in streaming topology.”,如果使用
TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用
StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
提交作业,就会出现前面的错误。

对于
“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset

Best,
Godfrey

Leonard Xu  于2020年7月12日周日 下午1:48写道:

> HI, fulin
>
> 能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
>  TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是
> DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink
> job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …)
> 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。
> Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法,
> 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert
> tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”),
> TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job,
> 这应该不是用户需要的。
> 具体使用根据你的需要来使用。
>
>
> Best,
> Leonard Xu
>
>
> 在 2020年7月11日,22:24,sunfulin  写道:
>
> statementset.execute
>
>
>


Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread godfrey he
hi Flavio,
Only old planner supports BatchTableEnvironment (which can convert to/from
DataSet),
while Blink planner in batch mode only support TableEnvironment. Because
Blink planner
convert the batch queries to Transformation (corresponding to DataStream),
instead of DataSet.

one approach is you can migrate them to TableSource instead (InputFormat
can be reused),
but TableSource will be deprecated later. you can try new table source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

Best,
Godfrey

Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:

> Thanks but I still can't understand how to migrate my legacy code. The
> main problem is that I can't create a BatchTableEnv anymore so I can't
> call createInput.
>
> Is there a way to reuse InputFormats? Should I migrate them to TableSource
> instead?
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
> ft).finish();
> DataSet rows = env.createInput(myInputformat);
> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1,
> WriteMode.OVERWRITE);
> btEnv.registerTableSink("out", dsFields, ft, outSink);
> btEnv.insertInto(table, "out", btEnv.queryConfig());
> env.execute();
>   }
>
> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz 
> wrote:
>
>> You should be good with using the TableEnvironment. The
>> StreamTableEnvironment is needed only if you want to convert to
>> DataStream. We do not support converting batch Table programs to
>> DataStream yet.
>>
>> A following code should work:
>>
>> EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inBatchMode().build();
>>
>> TableEnvironment.create(settings);
>>
>> Best,
>>
>> Dawid
>>
>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>> > Hi to all,
>> > I was trying to update my legacy code to Flink 1.11. Before I was
>> > using a BatchTableEnv and now I've tried to use the following:
>> >
>> > EnvironmentSettings settings =
>> > EnvironmentSettings.newInstance().inBatchMode().build();
>> >
>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>> >
>> > if (!settings.isStreamingMode()) {
>> > throw new TableException(
>> > "StreamTableEnvironment can not run in batch mode for now, please use
>> > TableEnvironment.");
>> > }
>> >
>> > What should I do here?
>> >
>> > Thanks in advance,
>> > Flavio
>>
>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-08 Thread godfrey he
可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
JobClient 可以 cancel 作业,获取 job status。

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey

Evan  于2020年7月9日周四 上午9:40写道:

> 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> API有没有提供类似的接口,调用后就能停止这个Stream作业呢?


Re: Flink SQL如何将多个表的查询结果(列不同)聚合成一张表

2020-07-08 Thread godfrey he
select a.table_tmp1.r1 / a.table_tmp2.r2
这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。


zilong xiao  于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段
> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~
> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from
> (
> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS
> table_tmp2,
> )as a
>


Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset

Best,
Godfrey

Zhou Zach  于2020年7月8日周三 下午4:19写道:

> 去掉就好了,感谢解答
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 16:07:17,"Jingsong Li"  写道:
> >Hi,
> >
> >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
> >
> >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
> >并没有真正的物理节点。你不用再调用了。
> >
> >Best,
> >Jingsong
> >
> >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:
> >
> >>
> >>
> >>
> >> 代码结构改成这样的了:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>
> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>
> >>
> >> streamExecutionEnv.execute("from kafka sink hbase")
> >>
> >>
> >>
> >>
> >> 还是报一样的错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-08 15:40:41,"夏帅"  写道:
> >> >你好,
> >> >可以看看你的代码结构是不是以下这种
> >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >> >val bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >> >  ..
> >> >tableEnv.execute("")
> >> >如果是的话,可以尝试使用bsEnv.execute("")
> >> >1.11对于两者的execute代码实现有改动
> >> >
> >> >
> >> >--
> >> >发件人:Zhou Zach 
> >> >发送时间:2020年7月8日(星期三) 15:30
> >> >收件人:Flink user-zh mailing list 
> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming
> topology
> >> >
> >> >代码在flink
> >>
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >> >Exception in thread "main" java.lang.IllegalStateException: No
> operators
> >> defined in streaming topology. Cannot generate StreamGraph.
> >> >at
> >>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >> >
> >> >
> >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >> >
> >> >
> >> >
> >> >
> >> >query:
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE `user` (
> >> >|uid BIGINT,
> >> >|sex VARCHAR,
> >> >|age INT,
> >> >|created_time TIMESTAMP(3),
> >> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >|) WITH (
> >> >|'connector.type' = 'kafka',
> >> >|'connector.version' = 'universal',
> >> >|-- 'connector.topic' = 'user',
> >> >|'connector.topic' = 'user_long',
> >> >|'connector.startup-mode' = 'latest-offset',
> >> >|'connector.properties.group.id' = 'user_flink',
> >> >|'format.type' = 'json',
> >> >|'format.derive-schema' = 'true'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE user_hbase3(
> >> >|rowkey BIGINT,
> >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >> >|) WITH (
> >> >|'connector.type' = 'hbase',
> >> >|'connector.version' = '2.1.0',
> >> >|'connector.table-name' = 'user_hbase2',
> >> >|'connector.zookeeper.znode.parent' = '/hbase',
> >> >|'connector.write.buffer-flush.max-size' = '10mb',
> >> >|'connector.write.buffer-flush.max-rows' = '1000',
> >> >|'connector.write.buffer-flush.interval' = '2s'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|insert into user_hbase3
> >> >|SELECT uid,
> >> >|
> >> >|  ROW(sex, age, created_time ) as cf
> >> >|  FROM  

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread godfrey he
Congratulations!

Thanks Zhijiang and Piotr for the great work, and thanks everyone for their
contribution!

Best,
Godfrey

Benchao Li  于2020年7月8日周三 下午12:39写道:

> Congratulations!  Thanks Zhijiang & Piotr for the great work as release
> managers.
>
> Rui Li  于2020年7月8日周三 上午11:38写道:
>
>> Congratulations! Thanks Zhijiang & Piotr for the hard work.
>>
>> On Tue, Jul 7, 2020 at 10:06 PM Zhijiang 
>> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.11.0, which is the latest major release.
>>>
>>> Apache Flink® is an open-source stream processing framework for distributed,
>>> high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this new major release:
>>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Cheers,
>>> Piotr & Zhijiang
>>>
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>
>
> --
>
> Best,
> Benchao Li
>


Re: Blink Planner Retracting Streams

2020-06-16 Thread godfrey he
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the
StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
  override def map(value: (Boolean, Row)): R = ...
})

Bests,
Godfrey

John Mathews  于2020年6月17日周三 下午12:13写道:

> Hello,
>
> I am working on migrating from the flink table-planner to the new blink
> one, and one problem I am running into is that it doesn't seem like Blink
> has a concept of a CRow, unlike the original table-planner.
>
> I am therefore struggling to figure out how to properly convert a
> retracting stream to a SingleOutputStreamOperator when using just the Blink
> planner libraries.
>
> E.g. in the old planner I could do something like this:
> SingleOutputStreamOperator stream =
> tableEnvironment.toRetractStream(table, typeInfo)
> .map(value -> new CRow(value.f1, value.f0);
>
> but without the CRow, I'm not sure how to accomplish this.
>
> Any suggestions?
>
> Thanks!
> John
>
>
>


Re: pyflink数据查询

2020-06-15 Thread godfrey he
hi jack,jincheng

Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
   it.next() 
}

但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)

但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。

可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。

Best,
Godfrey



jincheng sun  于2020年6月15日周一 下午4:14写道:

> 你好 Jack,
>
> >  pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
> 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
> 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
> 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
> 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
> 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
> 如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
> Best,
> Jincheng
>
>
>
> Jeff Zhang  于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack  于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: pyflink数据查询

2020-06-15 Thread godfrey he
hi jack,jincheng

Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
   it.next() 
}

但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)

但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。

可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。

Best,
Godfrey



jincheng sun  于2020年6月15日周一 下午4:14写道:

> 你好 Jack,
>
> >  pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
> 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
> 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
> 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
> 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
> 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
> 如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
> Best,
> Jincheng
>
>
>
> Jeff Zhang  于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack  于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: flink sql字段重命名问题

2020-06-12 Thread godfrey he
hi,如 benchao 所说,SELECT XX AS YY 和Table API的renameColumns等价。
而且这些名字仅用于sql解析和优化阶段,实际执行的时候不会使用字段名。

Best,
Godfrey

Benchao Li  于2020年6月12日周五 下午6:36写道:

> 直接用SELECT XX AS YY就等价于Table API的renameColumns了吧。
>
> naisili Yuan  于2020年6月12日周五 下午6:23写道:
>
> > Hi all
> > 想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink
> > sql的没有看到明确的接口
> > 我自己试了一下在创建表的sql语句中就加入name_alias  AS
> > name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢!
> >
>


Re: How to use Hbase Connector Sink

2020-06-11 Thread godfrey he
hi,

you should make sure the types of the selected fields and the types of sink
table are the same,
otherwise you will get the above exception. you can change `active_ratio*25
score` to row type, just like:

insert into circle_weight select rowkey, ROW(info) from (
select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info
from tb) t;


Best,
Godfrey

op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:

> hi
> flink1.10,wen i want to sink data to hbase table like this:
>
>  bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
>rowkey String,
>info ROW
>  ) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',
>'connector.table-name' = 'ms:test_circle_info',
>'connector.zookeeper.quorum' = 'localhost:2181',
>'connector.zookeeper.znode.parent' =
> '/hbase-secure',
>'connector.write.buffer-flush.max-size' =
> '10mb',
>'connector.write.buffer-flush.max-rows' =
> '1000',
>'connector.write.buffer-flush.interval' = '2s'
>  )""")
>
> bstEnv.sqlUpdate(
>   """
> |insert into circle_weight
> |select
> |concat_ws('_',circleName,dt) rowkey,
> |active_ratio*25 score
> |from tb""")
>
> but i get following exceptions,can anybody tell me what is wrong?
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink
> default_catalog.default_database.circle_weight do not match.
> Query schema: [rowkey: STRING, score: DOUBLE]
> Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
>


  1   2   >