Re: flinksql如何控制结果输出的频率

2020-03-26 文章 Jun Zhang
hi:
你可以自定义一个trigger [1]
第二个场景是可以的,第一种场景我没有遇到过这种场景,你可以试试。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道:

> 我有两个需求
> 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
> 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?


Re: [udf questions]

2020-03-26 文章 WuPangang
ERROR log:
.
Job has been submitted with JobID 91ac323d4d5338418883240680192f34
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/table/table_environment.py",
 line 907, in execute
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
91ac323d4d5338418883240680192f34)
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 91ac323d4d5338418883240680192f34)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50, 
backoffTimeMS=6)
at 

flinksql如何控制结果输出的频率

2020-03-26 文章 flink小猪
我有两个需求
1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?

Re: [udf questions]

2020-03-26 文章 WuPangang
感谢大佬回复。
根据邮件里面的提示下我尝试了如下操作:

@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
def str_add(str_name):
  return '1'
table_env.register_function("str_add", str_add)
table_env.sql_update("insert into flink_sinktable_ad_test_1 \
select \
str_add(topicid) AS topicid \
from \
flink_sourcetable_ad_test \
")
目的:我的目的是想通过最简单的方式看看udf是否有生效。
结果:结果依赖没有数据流入近来。
其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。


所以能在分析下么?或者我应该如何深入的跟踪下?


all code below:
from pyflink.datastream import 
StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
from pyflink.table import StreamTableEnvironment, 
EnvironmentSettings,TableSink,TableConfig,DataTypes
from pyflink.table.descriptors import 
Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
from pyflink.common import RestartStrategies
from pyflink.table.udf import udf
import json

env = StreamExecutionEnvironment.get_execution_environment()
##contain设置
env.set_parallelism(12)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
##使用blink api
environment_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
table_env = 
StreamTableEnvironment.create(env,environment_settings=environment_settings)

table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
host STRING, \
type STRING, \
topicid STRING, \
message STRING, \
proctime as PROCTIME() \
) WITH ( \
  'connector.type' = 'kafka',\
  'connector.version' = 'universal', \
  'connector.topic' = 'advertise_module',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_source', \
  'connector.startup-mode' = 'latest-offset', \
  'format.type' = 'json' \
)")

table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \
topicid STRING \
) WITH ( \
  'connector.type' = 'kafka',\
  'connector.version' = 'universal', \
  'connector.topic' = 'recommend_user_concern_test',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_sink', \
  'connector.startup-mode' = 'latest-offset', \
  'connector.properties.retries' = '3', \
  'format.type' = 'json', \
  'connector.properties.update_mode' = 'append' \
)")
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
def str_add(str_name):
  return '1'
table_env.register_function("str_add", str_add)
#table_env.register_function("str_add", udf(lambda i: i + '1', 
DataTypes.STRING(), DataTypes.STRING()))
table_env.sql_update("insert into flink_sinktable_ad_test_1 \
select \
str_add(topicid) AS topicid \
from \
flink_sourcetable_ad_test \
")
table_env.execute('flink_1.10_test’)

--
> 在 2020年3月26日,下午5:55,jincheng sun  写道:
> 
> 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。
> 
> Best,
> Jincheng
> 
> 
> WuPangang mailto:wpangang1...@icloud.com>> 
> 于2020年3月26日周四 下午5:24写道:
> Data as below:
>  
> {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23
>  
> 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com
>  
> 

回复:(无主题)

2020-03-26 文章 被惊艳的时光
并发是200和400两种,集群有270多个节点,不过可用的vcores是6600多,内存是17T左右,看了执行图q43这个存在数据倾斜的的问题,失败的节点存在数据量偏大的情况



---原始邮件---
发件人: "Jingsong Li"

Re: (无主题)

2020-03-26 文章 Jingsong Li
Hi,

- 是否是计算规模的问题?
集群大小合适吗?并发合适吗?

- 是否是Plan不优的问题?
Hive的表有做Analysis吗?

CC: user

Best,
Jingsong Lee

On Thu, Mar 26, 2020 at 8:27 PM 被惊艳的时光 <2521929...@qq.com> wrote:

>
> hello,你好,有个关于flink-sql-benchmark工具的问题需要请教下,在做tpc-ds测试时,当数据量达到4T时(flink版本1.10),q43,q67,q70这三条sql执行出错了,都是在hashjoin的时候失败啦,报错信息是hashjoin迭代的次数过多,不知道之前你们在测试时有没有出现这种情况
>


-- 
Best, Jingsong Lee


Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 LakeShen
统一对 Flink 项目源码进行编译打包,你会在 flink-dist 这个模块下面的 target 目录下面看到相关 Flink
命令行的一些东西,同时在lib 包下面,
会有一些 Flink Jar 包

Best wishes,
沈磊

godfrey he  于2020年3月26日周四 下午8:51写道:

> 目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala
> 2.11 的包,例如  flink-table-blink_*2.11*-1.10.0.jar。
> 可以通过  -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12*
> -1.10.0.jar  这样的。
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年3月26日周四
> 下午6:34写道:
>
> >
> > flink-table-uber-blink 下
> >  mvn clean install -DskipTests -Dscala-2.12 -DskipTests
> >
> > 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> > Sender: Kurt Young
> > Send Time: 2020-03-26 18:15
> > Receiver: user-zh
> > cc: jihongchao
> > Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
> > flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
> > wangl...@geekplus.com.cn> wrote:
> >
> > >
> > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> > > 这个 jar 是从哪里 build 出来的呢?
> > >
> > > 我 clone github 上的源代码,mvn clean package
> > > 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> > > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
> > >  flink-table-blink_2.12-1.10.0.jar  是对应的
> > > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
> > >
> > > 谢谢,
> > > 王磊
> > >
> > >
> > >
> > > wangl...@geekplus.com.cn
> > >
> > >
> >
>


Flink 1.10: 关于 RocksDBStateBackend In background 状态清理机制的问题

2020-03-26 文章 LakeShen
Hi 社区的小伙伴,

我现在有个关于 Flink 1.10 RocksDBStateBackend 状态清理机制的问题,在 1.10中,RocksDB 默认使用 in
background 方式进行状态清理,使用 compaction filter 方式。正如官方文档所说:

> RocksDB compaction filter will query current timestamp, used to check
> expiration, from Flink every time after processing certain number of state
> entries.


现在有个疑问,RocksDB 在处理一定数量的 State Entrys 就会进行 compaction filter,那么这个 compaction
filter 是针对这一定数量 State Entrys ,然后检查他们是否过期吗?
还是说,会针对一个 Task 当前所有的状态文件,统一进行 Compaction filter,在合并时,检查每个 entry,过期的状态 Key
就过滤删除掉。

这个地方我没有弄明白,非常期待你的回复。

Best wishes,
沈磊


Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 godfrey he
目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala
2.11 的包,例如  flink-table-blink_*2.11*-1.10.0.jar。
可以通过  -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12*
-1.10.0.jar  这样的。

Best,
Godfrey

wangl...@geekplus.com.cn  于2020年3月26日周四 下午6:34写道:

>
> flink-table-uber-blink 下
>  mvn clean install -DskipTests -Dscala-2.12 -DskipTests
>
> 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
> Sender: Kurt Young
> Send Time: 2020-03-26 18:15
> Receiver: user-zh
> cc: jihongchao
> Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
> flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
>
> Best,
> Kurt
>
>
> On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> >
> > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> > 这个 jar 是从哪里 build 出来的呢?
> >
> > 我 clone github 上的源代码,mvn clean package
> > 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
> >  flink-table-blink_2.12-1.10.0.jar  是对应的
> > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
> >
> > 谢谢,
> > 王磊
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>


Flink Weekly | 每周社区动态更新 - 2020/03/26

2020-03-26 文章 forideal
大家好,本文为 Flink Weekly 的第十期,由张成整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。
社区开发进展 

[release] 关于发布 Flink 1.10.1 的讨论正在火热进行,最新消息请参考 Yu Li 发起的讨论。

[1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html




[Checkpoint] Arvid Heise 发起 FLIP-76 的投票已经通过。FLIP-76 提出了一种基于检查点屏障的非阻塞对齐执行检查点的方法。

相关好处有:

即使某些 Operator 仍在等待正在输入通道上的检查点屏障,上游仍可以继续产生数据。

即使对于具有单个输入通道的 Operator,在整个执行图中的检查点次数也大大减少。

即使在不稳定的环境中,最终用户也将看到更多的进展,因为更及时的检查点将避免过多的重复计算。

促进更快地 rescaling。

更多信息参考:

[2]https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

[3]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-76-Unaligned-checkpoints-td33651.html




[Connectors/Filesystem] 删除 BucketingSink。BucketingSink 已经在 Flink 1.9 
版本标记为过期。Flink 有一个新的 StreamingFileSink 替代 BucketingSink。目前 StreamingFileSink 的 
scala 版本存在 bug。

[4]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-16616-Drop-BucketingSink-td38950.html

[5]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Bucketing-Sink-td38830.html#a38831

[6]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-16684-StreamingFileSink-builder-does-not-work-with-Scala-td39109.html




[Table API & SQL] Jingsong Li 发起了引入 StatefulSequenceSource 的讨论。这个能够方便用户更好的进行测试 
SQL。最终讨论决定在 Table 支持 DataGenerator 的 source、Print 的 sink 和blackhole 的 sink。

[7]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-TableFactory-for-StatefulSequenceSource-td39116.html




[sql] Timo 分享了一个关于新的 TableSource 和 TableSink 
接口的提案(FLIP-95)。Jark、Dawid、Aljoscha、Kurt、Jingsong 
等参考了讨论。其目标是简化当前的接口架构,以支持变更日志源(FLIP-105)和删除对 DataStream API 和 planner 的依赖。

[8]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces




[hadoop]跟进 Stephan 和 Till 的讨论。Sivaprasanna 分享了 Hadoop 
相关实用程序组件的概述,以开始讨论将其移动到单独的模块中 “flink-hadoop-utils”。

[9]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SerializableHadoopConfiguration-td38371.html

[10]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-a-new-module-flink-hadoop-utils-td39107.html




用户问题

叶贤勋在使用 Hive Source 的时候遇到了 Kerberos 认证的问题,社区同学进行了相关的讨论和建议,感兴趣的同学可以参考如下链接:

[11]http://apache-flink.147419.n8.nabble.com/Hive-Source-With-Kerberos-td1688.html




hiliuxg 在社区提问 Flink SQL 如何支持每隔 5 分钟触发当日零点到当前 5 分钟的聚合计算。Jark Wu 和 Tianwang Li 
进行了相关解答。

[12]http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html




hiliuxg 在社区提问 Flink SQL COUNT DISTINCT 性能优化。Benchao Li、田志声、Lucas Wu、Lake Shen 
展开了一些讨论,有兴趣的同学可以参考如下链接:

[13]http://apache-flink.147419.n8.nabble.com/flink-sql-td2012.html




王志华 在社区提问 Flink DDL 如何支持自定义 Source/Sink 表。社区同学在邮件中进行了详细的回答。

[14]http://apache-flink.147419.n8.nabble.com/ddl-td1959.html




111 在社区提问 Flink SQL1.10 大表 join 如何优化?Jark Wu、Kurt Young 和 Jingsong Lee 
进行了详细的解答。目前 Flink SQL 的并行度(非 Source 
)并不是自动推断出来的,需要通过设置table.exec.resource.default-parallelism,详细的内容参考:

[15]http://apache-flink.147419.n8.nabble.com/Flink-SQL1-10-join-td2044.html

[16]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-How-can-i-set-parallelism-in-clause-of-group-by-td33736.html




Aaron Levin 在社区提问 如何能够做到修改任务的并发,然后从 checkpoint 启动任务。Piotr Nowojski、Till 
Rohrmann 参与了相关讨论。内容涉及到 unaligned checkpoints (FLIP-76) 对savepoint 和 checkpoint 
的影响。同时 Lake Shen 也提出了类似的问题。有兴趣的同学可以参考

[17]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-when-changing-operator-parallelism-but-starting-from-an-incremental-checkpoint-td33608.html

[18]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cancel-the-flink-task-and-restore-from-checkpoint-can-I-change-the-flink-operator-s-parallelism-td33613.html




Jiawei Wu 在社区提问“如何使用 Flink SQL 计算 按照供应商分组同时入库时间大于 15 天的库存数据?”,有兴趣的同学可以参考:

[19]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-flink-to-calculate-sum-of-the-inventory-under-certain-conditions-td33323.html




Vinod Mehra 在社区提出了一个关于 Join 相关的问题。这个问题比较复杂,Timo Walther 进行了相关解答。里面涉及到了一些如何进行 
Flink SQL 问题的排查。有兴趣的同学可以参考:

[20]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/time-windowed-joins-and-tumbling-windows-td33551.html

活动博客文章及其他




SQL 开发任务超 50% !滴滴实时计算的演进与优化

[21]https://ververica.cn/corporate_practice/evolution-and-optimization-of-didi-real-time-computing/




Flink 生态:一个案例快速上手 PyFlink

[22]https://ververica.cn/developers/pyflink-a-case-in-hand/




一套 SQL 
搞定数据仓库?Flink有了新尝试[23]https://ververica.cn/developers/a-set-of-sql-to-handle-data-warehouse/




如何在 Flink 中规划 RocksDB 内存容量?

[24]https://ververica.cn/developers/how-to-plan-the-memory-capacity-of-rocksdb-in-flink/





Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 wangl...@geekplus.com.cn

flink-table-uber-blink 下
 mvn clean install -DskipTests -Dscala-2.12 -DskipTests

不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的

谢谢, 
王磊


wangl...@geekplus.com.cn 
 
Sender: Kurt Young
Send Time: 2020-03-26 18:15
Receiver: user-zh
cc: jihongchao
Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
 
Best,
Kurt
 
 
On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-26 文章 godfrey he
还有一种方式是sql gateway 支持 --jar 和 --library 指定用户的jar,这种方式不需要用户将jar放到flink的lib下

godfrey he  于2020年3月25日周三 下午6:24写道:

> hi 赵峰,
>
> 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
> JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。
>
> Best,
> Godfrey
>
> Zhenghua Gao  于2020年3月25日周三 下午4:26写道:
>
>> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。
>> 目前的报错看起来是找不到kafka connector的jar包。
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Wed, Mar 25, 2020 at 4:18 PM 赵峰  wrote:
>>
>> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>> >
>> >
>> > 
>> >
>> > 参考下这个文档:
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> > 下面的语法应该是不支持的:
>> >   'format.type' = 'csv',\n" +
>> > "'format.field-delimiter' = '|'\n"
>> >
>> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
>> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
>> > + "order_no VARCHAR,\n"
>> > + "status INT\n"
>> > + ") WITH (\n"
>> > + "'connector.type' = 'kafka',\n"
>> > + "'connector.version' = 'universal',\n"
>> > + "'connector.topic' = 'wanglei_test',\n"
>> > + "'connector.startup-mode' = 'latest-offset',\n"
>> > + "'connector.properties.0.key' = 'zookeeper.connect',\n"
>> > + "'connector.properties.0.value' = 'xxx:2181',\n"
>> > + "'connector.properties.1.key' = 'bootstrap.servers',\n"
>> > + "'connector.properties.1.value' = 'xxx:9092',\n"
>> > + "'update-mode' = 'append',\n"
>> > + "'format.type' = 'json',\n"
>> > + "'format.derive-schema' = 'true'\n"
>> > + ")");
>> >
>> > 王磊
>> >
>> >
>> > wangl...@geekplus.com.cn
>> > 发件人: 赵峰
>> > 发送时间: 2020-03-24 21:28
>> > 收件人: user-zh
>> > 主题: Flink JDBC Driver是否支持创建流数据表
>> > hi
>> >
>> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
>> > Connection connection =
>> >
>> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
>> > Statement statement = connection.createStatement();
>> > statement.executeUpdate(
>> > "CREATE TABLE table_kafka (\n" +
>> > "user_id BIGINT,\n" +
>> > "item_id BIGINT,\n" +
>> > "category_id BIGINT,\n" +
>> > "behavior STRING,\n" +
>> > "ts TIMESTAMP(3),\n" +
>> > "proctime as PROCTIME(),\n" +
>> > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
>> > ") WITH (\n" +
>> > "'connector.type' = 'kafka', \n" +
>> > "'connector.version' = 'universal', \n" +
>> > "'connector.topic' = 'flink_im02', \n" +
>> > "'connector.properties.group.id' = 'flink_im02_new',\n" +
>> > "'connector.startup-mode' = 'earliest-offset', \n" +
>> > "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
>> > "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
>> > "'format.type' = 'csv',\n" +
>> > "'format.field-delimiter' = '|'\n" +
>> > ")");
>> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
>> > while (rs1.next()) {
>> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
>> > }
>> > statement.close();
>> > connection.close();
>> > 报错:
>> > Reason: Required context properties mismatch.
>> > The matching candidates:
>> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> > Mismatched properties:
>> > 'connector.type' expects 'filesystem', but is 'kafka'
>> > 赵峰
>> >
>> > 
>> > Quoted from:
>> >
>> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>> >
>> >
>> >
>> >
>> > 赵峰
>>
>


Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 Kurt Young
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)

Best,
Kurt


On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


Re: [udf questions]

2020-03-26 文章 jincheng sun
比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。

Best,
Jincheng


WuPangang  于2020年3月26日周四 下午5:24写道:

> Data as below:
>  
> {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23
> 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/
> down-ddz.734399.com\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/
> (PRA-AL00X; Android; Android OS ; 8.0.0; zh)
> ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/
> down-ddz.734399.com
> \\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/
> p12.jmstatic.com
> \\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”}
> Problem:
> 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。
> 自己思考的解决思路:通过udf, 使用json.loads来处理。
> 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied
> 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。
>
> Code as below:
> from pyflink.datastream import
> StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
> from pyflink.table import StreamTableEnvironment,
> EnvironmentSettings,TableSink,TableConfig,DataTypes
> from pyflink.table.descriptors import
> Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
> from pyflink.common import RestartStrategies
> from pyflink.table.udf import udf
> import json
>
> env = StreamExecutionEnvironment.get_execution_environment()
> #env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> ##checkpoint设置
> #env.enable_checkpointing(30)
>
> #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE)
> #env.get_checkpoint_config().set_min_pause_between_checkpoints(3)
> #env.get_checkpoint_config().set_checkpoint_timeout(6)
> #env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
> #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False)
> ##contain设置
> env.set_parallelism(12)
> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
> ##使用blink api
> environment_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> table_env =
> 

flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 wangl...@geekplus.com.cn

单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar 
这个 jar 是从哪里 build 出来的呢?
 
我 clone github 上的源代码,mvn clean package
我以为 flink-table/flink-table-planner-blink 目录下build 出的 
flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的   
flink-table-blink_2.12-1.10.0.jar  是对应的
我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。

谢谢,
王磊



wangl...@geekplus.com.cn 



Re: flink1.10 & pyflink相关问题咨询

2020-03-26 文章 jincheng sun
看你错误日志运行的示例使用了PyUDFDemoConnector,也就是参考的博客[1],
在写这个博客时候1.10还没有发布,在发布之后接口有变化,所以PyUDFDemoConnector有个问题,我前两天进行了更新。你可以更新一下JAR。

另外你发的问题很久之前,发布1.10之前已经fix了[2],所以你更新一下connector在测试一下看看。有问题继续沟通。

Best,
Jincheng
[1]
https://enjoyment.cool/2019/12/05/Apache-Flink-%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%E5%A6%82%E4%BD%95%E5%9C%A8PyFlink-1-10%E4%B8%AD%E8%87%AA%E5%AE%9A%E4%B9%89Python-UDF/
[2] https://issues.apache.org/jira/browse/FLINK-14581


zilong xiao  于2020年3月25日周三 下午12:19写道:

> 是的,有一个关键步骤:`source
> py36/bin/activate`是在文档中未体现的,执行该步骤后提交到yarn集群可以正常工作,然后最近在进一步研究1.10对于udf的支持,在尝试提交udf作业时,会出现如下异常:
>
> Caused by: java.io.IOException: Cannot run program
> "xxx/pyflink-udf-runner.sh": error=2, No such file or directory
>
> 提交作业前的操作如下:
> 1.pip install virtualenv
> 2.virtualenv --always-copy venv
> 3.venv/bin/pip install apache-beam==2.15.0
> 4.venv/bin/pip install apache-flink
> 5.venv/bin/pip install pydemo.tar.gz
> 6.zip -r venv.zip venv
> 7.bin/flink run -pyarch venv.zip -pyexec venv.zip/venv/bin/python -py
> ./word_count_socket.py -j pydemo.jar
>
> 不知道前辈是否有遇到过类似情况呢?
>
> 完整异常栈信息 & 作业见附件
>
> jincheng sun  于2020年3月19日周四 下午12:08写道:
>
>> 开心看到你在使用PyFlink 1.10,您遇到的问题,核心问题和将解决方式如下:
>>
>> 1.
>> 利用shell的alias功能更改python命令指向是无效的,因为flink不通过shell启动Python进程。所以对flink来说本地python环境依然是python2.
>> 2. 可以通过virtualenv, conda等工具创建python3.5+的环境,并激活,在激活了的环境下提交python job。 比如:
>>   pip install virtualenv
>>   virtualenv --python /usr/local/bin/python3 py36
>>   source py36/bin/activate
>>   flink run -py pyflink.py
>> 3. 另外也可以修改python命令的软链接,令其指向python3.5+。
>>
>> 你可以尝试一下,有问题随时邮件交流!
>>
>> Best,
>> 孙金城(金竹)
>>
>>
>>
>> zilong xiao  于2020年3月18日周三 下午12:14写道:
>>
>>> hi,金竹前辈您好,我是一名从事实时计算方向的IT工作者,最近在使用flink1.10 &
>>> pyflink时遇到一点问题,希望能加下您的钉钉或者其他联系方式和您进一步交流,问题大概描述如下:
>>>
>>> 任务提交环境:
>>> Apache-beam:2.15.0
>>> 本地python:2.7(已配置python3.7,通过修改~/.zshrc,alias
>>> python='/usr/local/bin/python3.7')
>>> pip:20.0.2
>>> flink:1.10
>>>
>>> 提交命令:bin/flink run -pyarch tmp/venv.zip -pyexec
>>> tmp/venv.zip/venv/bin/python3 -py word_count.py
>>>
>>> 在本地尝试以pre-job模式部署作业时,发现会提示如下报错,导致任务提交失败
>>>
>>> RuntimeError: Python versions prior to 3.5 are not supported for PyFlink
>>> [sys.version_info(major=2, minor=7, micro=16, releaselevel='final',
>>> serial=0)].
>>>
>>>
>>> 显而易见,正如flink官方文档所说flink1.10作业必须要求python3.5+,我通过-pyarch
>>> -pyexec来指定任务执行环境以及解释器环境,发现这两个指令貌似没生效,或者说没有作用,还是会有如上异常,具体执行过程都是参考您的文档:
>>> https://enjoyment.cool/2020/01/02/Apache-Flink-%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-PyFlink-%E4%BD%9C%E4%B8%9A%E7%9A%84%E5%A4%9A%E7%A7%8D%E9%83%A8%E7%BD%B2%E6%A8%A1%E5%BC%8F/#more
>>> 来操作的,我在想可能还是我的打开方式不对,亦或该指令还存在隐藏问题?可是网上也没有太多的资料,所以希望能和前辈您交流交流,帮我解开这个疑惑,期待前辈您的回复。
>>>
>>


Re: Flink 1.10 JSON 解析

2020-03-26 文章 Zhenghua Gao
Hi 张宇

看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。
开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 20, 2020 at 5:28 PM 宇张  wrote:

> hi,
> 了解了,我重新整理一下:
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test")
> .startFromEarliest()
> .property("zookeeper.connect",
> "localhost:2181")
> .property("bootstrap.servers",
> "localhost:9092")
> )
> .withFormat(
> new Json()
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ARRAY(
> DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()),
> DataTypes.FIELD("vendor_id",
> DataTypes.DOUBLE()),
> DataTypes.FIELD("status",
> DataTypes.BIGINT()),
> DataTypes.FIELD("create_time",
> DataTypes.BIGINT()),
> DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no",
> DataTypes.STRING()),
> DataTypes.FIELD("parent_id",
> DataTypes.BIGINT()
> .field("database", DataTypes.STRING())
> .field("old",
> DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status",
> DataTypes.DECIMAL(38,18)
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.BIGINT())
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.BIGINT())
> )
> .createTemporaryTable("Test");
> 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的;
> 异常:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY> of table field 'old'
> does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
> type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
> at
> org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
>
> 

Re: Re: Re: Re: 向您请教pyflink在windows上运行的问题,我第一次接触flink。

2020-03-26 文章 jincheng sun
第一行错误信息是没有安装 bash ?

xu1990xaut  于2020年3月26日周四 下午12:12写道:

> 孙老师,我按照您视频里的方法把flink包安装好了。  但是运行您提供得demo时出现下面这个错误。  我在网上找了好久还是没解决。
> 望老师再指点指点。
>
>
>
>
>
> 在 2020-03-25 15:47:49,"jincheng sun"  写道:
>
> 哦,PyFlink目前不支持windows。
>
> Best,
> Jincheng
> -
> Twitter: https://twitter.com/sunjincheng121
> -
>
>
> xu1990xaut  于2020年3月25日周三 下午2:55写道:
>
>> 谢谢孙老师。   我用的就是这个示例。另外我看到python下又两个flink版本,一个是import flink,一个是import
>> pyflink。 pyflink是不是不能在windows下运行?
>> python下的flink我确定是安装正确的。
>> 运行flink是也启动了start-cluster.bat(start-clust.sh),但是pycharm控制台很久不出结果,cpu的占用率也正常。
>> 我实在不知道是哪里问题。
>>
>>
>>
>>
>>
>> 在 2020-03-25 14:44:25,"jincheng sun"  写道:
>>
>> 上面视频中对应的word_count示例的源码应该是这个:
>> https://github.com/sunjincheng121/enjoyment.code/blob/master/myPyFlink/enjoyment/word_count.py运行完成之后计算结果应该是写到sink_file
>> = 'sink.csv'文件里面去了。你可以将这个文件的路径打印出来,查看这个文件内容。
>>
>> 另外如果您只是为了学习入门的话,建议你查阅[1][2], 我让想整理了解PyFlink最新的状况,可以查看[3]。
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/installation.html
>> [2]
>> https://enjoyment.cool/2020/01/22/Three-Min-Series-How-PyFlink-does-ETL/#more
>> [3]
>> https://www.bilibili.com/video/BV1W7411o7Tj?from=search=14518199503613218690
>>
>> Best,
>> Jincheng
>> -
>> Twitter: https://twitter.com/sunjincheng121
>> -
>>
>>
>> xu1990xaut  于2020年3月25日周三 下午2:23写道:
>>
>>> 孙老师您好,我之前在网上看的是这个视频《【Apache Flink 进阶教程】14课. Apache Flink Python API
>>> 的现状及未来规划》。 今天我也在虚拟机下试了,还是无法运行。
>>> 我用的是flink1.10,python3.6。  麻烦老师指点指点。
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-03-25 11:32:29,"jincheng sun"  写道:
>>>
>>> 很高兴收到您的邮件,我不太确定您具体看的是哪一个视频,所以很难确定您遇到的问题原因。您可以参考官方文档[1],
>>> 同时我个人博客里有一些3分钟的视频和文档入门教程[2],您可以先查阅一下。如又问题,可以保持邮件沟通,可以在我的博客留言!
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/tutorials/python_table_api.html
>>> [2] https://enjoyment.cool/
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>>
>>> xu1990xaut  于2020年3月24日周二 下午11:36写道:
>>>
 您好,之前在哔哩哔哩上看过您讲的视频。  也跟着视频动手做了。
 我用的flink1.10,在pip的时候是直接pip install apache-flink,结果默认就是1.10版本。
 然后我在pycharm中运行word-count这个脚本时,一直不出结果,也不报错。   请问这是什么原因。
 我也装了jdk,另外页面访问flink8081那个端口也可以出来界面。  我是第一次接触flink,在网上也搜过这个问题,
 可是一直没有得到答案。  麻烦您,给小弟指点指点,   谢谢您了。




>>>
>>>
>>>
>>>
>>
>>
>>
>>
>
>
>
>


订阅用户邮件列表

2020-03-26 文章 zhanglianzhg
你好!
我想订阅用户邮件列表,关注及解答用户问题,谢谢!!

NetworkBufferPool的使用

2020-03-26 文章 yanggang_it_job
Hi:

观察flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments的值发现这个值很大,也就是说NetworkBufferPool还很充裕,可我的任务还是发生了背压告警。
请问各位大佬这是为什么呢?