环境:
flinksql 1.12.2
k8s session模式
描述:
当kafka 端口错误,过一段时间会有如下报错:
2021-04-25 16:49:50
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired
before the position for partition filebeat_json_install_log-3 could be
determined
当kafka ip错误,过一段时间会有如下报错:
2021-04-25 20:12:53
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1,
但是我发现好像是10 ,
同时我也设置了其他的属性,比如
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
是可行,所以我的设置应该没有什么问题
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常
2线程上下文类加载器是什么
不太明白这两点,可以写个代码例子看看吗
在 2020-10-15 19:47:20,"amen...@163.com" 写道:
>追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
>那这种设置env的方式有可能还会造成其他什么问题?
>
>best,
>amenhub
>
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人:
集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 都是0
flink1.11.2 在k8s上部署,如何启动history server
之前1.10的yaml里面可以加命令,但是1.11的yaml是通过docker-entrypoint.sh
好像没发现这个入口脚本没有对应的history server参数
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(),
[DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数,
好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
在 2020-09-30 19:07:06,"chenx
版本:
pyflink==1.0
apache-flink==1.11.2
代码如下:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
'true')
class
我在使用k8s部署的时候也是按照官网的方式[1],是正常使用的, 然后后面加了volume配置
{
...
"spec": {
...
"template": {
...
"spec": {
"volumes": [
...
{
"name": "libs-volume",
"hostPath": {
"path":
>所以你mount进去会被修改掉
>
>[1].
>https://github.com/apache/flink-docker/blob/dev-1.11/docker-entrypoint.sh
>
>
>Best,
>Yang
>
>chenxuying 于2020年9月27日周日 下午7:56写道:
>
>> 根据官网[1]使用docker部署flink,session cluster模式
>> 环境win10+docker+flink1.11.2
>> cmd命令
>>
根据官网[1]使用docker部署flink,session cluster模式
环境win10+docker+flink1.11.2
cmd命令
docker run ^
-d^
--rm ^
--name=jobmanager ^
--hostname=jobmanager ^
--network flink-network ^
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
-p 28081:8081 ^
flink:1.11.2-scala_2.11
flinksql 版本是1.11.2
source接收到字段是字符串类型的时间
CREATE TABLE sourceTable (
`time` STRING
) WITH(
...
);
sink如下
CREATE TABLE sinktable (
`time1` STRING,
`time` TIMESTAMP(3)
) WITH (
'connector' = 'print'
);
insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式
insert into sinktable select
好的, 明白
在 2020-09-17 20:29:09,"Jark Wu" 写道:
>> sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库
>
>这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280
>
>Best,
>Jark
>
>On Thu, 17 Sep 2020 at 18:15, chenxuying wrote:
>
>&g
环境是flink1.11.2+idea
sql:
CREATE TABLE sourceTable (
platform STRING
,game_id bigint
) WITH (
...
);
CREATE TABLE sinktable (
platform STRING
,game_id bigint
) WITH (
'connector' = 'jdbc',
'url' = '',
'table-name' = '',
'driver' = 'com.mysql.jdbc.Driver',
好的 , 原来是bug , 感谢回答
在 2020-08-12 21:32:40,"Benchao Li" 写道:
>看起来是一个已知bug[1],已经修复,但是还没有发布。
>
>[1] https://issues.apache.org/jira/browse/FLINK-18862
>
>chenxuying 于2020年8月12日周三 下午9:25写道:
>
>> 版本:
>> flinksql 1.11.0
>> 需求:
&g
版本:
flinksql 1.11.0
需求:
需要实现多行聚合成一行功能
代码如下:
environmentSettings =
EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings =
environmentSettings)
不知道您是否知道原因
在 2020-08-04 12:11:32,"godfrey he" 写道:
>逻辑上批产生的结果是Table,流产生的结果是Changelog。
>你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
>最简单的方式可以将query改为带group by的,再看结果的差异。
>更多关于Table和Changelog的概念可以参考 [1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/z
hi :
flink table sql 1.11.0
在EnvironmentSettings中可以设置BatchMode或StreamingMode
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 ,
cs-master/dev/table/connectors/jdbc.html#idempotent-writes>
>
>
>> 在 2020年8月3日,10:33,chenxuying 写道:
>>
>> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT
>> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
>> 然后还想问下在使
://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling>
>
>> 在 2020年8月1日,19:20,chenxuying 写道:
>>
>> Hello
>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert
>> ove
bleSink to implement
SupportsOverwrite interface.
是得自定义connector吗,实现DynamicTableSink?
祝好
chenxuying
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
hi
ok,谢谢,懂了哈哈
在 2020-07-31 21:27:02,"Leonard Xu" 写道:
>Hello
>
>> 在 2020年7月31日,21:13,chenxuying 写道:
>>
>> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
>
>简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY
谢谢回答
使用新属性可以 成功修改记录 ,
但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
在 2020-07-31 16:46:41,"Leonard Xu" 写道:
>Hi, chenxuying
>
>看你还是用的还是 " 'connector.type' = 'jdbc', …. "
>,这是老的option,使用老的option参数还是需要根据query推导主键,
>需要使用新的属性[1]:&q
hi
我使用的flink 1.11.0版本
代码如下
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
tableEnvironment.executeSql(" " +
" CREATE TABLE mySource ( " +
" a bigint, " +
" b bigint " +
"
>memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>'80m')。你可以参考文档上的例子,以及对应的note说明[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions
>
>Best,
>Xingb
官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import
目前使用的是flink 1.10.0
背景:
REST API有一个提交job的接口
接口 /jars/:jarid/run
参数entryClass,programArgs,parallelism,jobId,allowNonRestoredState,savepointPath
如果使用命令行方式提交job
flink run -C file:///usr/local/soft/flink/my-function-0.1.jar -c
cn.xuying.flink.table.sql.ParserSqlJob
你好,请问下,my_parse是个udf吧
然后有没有什么操作可以使用udtf解析出多个字段 , 这些字段直接就是source表的字段 , 然后选出时间字段定义 watermark ,
类似如下
CREATE TABLE sourceTable(
request_uri STRING,
(column_1,column_2,heart_time) as udtf_parse(request_uri)
)with(..);
哈哈,不知道有没有这样的语法
在 2020-06-24 12:24:46,"Jark Wu" 写道:
>你可以在 DDL
不是个参数列表么,多传几个。
>
>或把依赖提前部署到${FLINK_HOME}/plugins里
>
>chenxuying 于2020年4月21日周二 下午3:36写道:
>
>> 这个是可以 , 不过我们的需求不允许打FatJar
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
这个是可以 , 不过我们的需求不允许打FatJar
在 2020-04-21 15:27:48,"Arnold Zai" 写道:
>打个FatJar
>
>chenxuying 于2020年4月21日周二 下午2:47写道:
>
>> 请问下目前flink的启动方式有哪些
>> 1 通过命令行来执行
>> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> cn.xxx.flink
请问下目前flink的启动方式有哪些
1 通过命令行来执行
flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
2通过自带的webui页面上传jar , submit jar
3 通过代码 createRemoteEnvironment
目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
31 matches
Mail list logo