Re:create table语句从kafka中读取数据时,创建的表的数据保存多久?

2020-12-15 文章 felixzh
数据还是保存在kafka里面的,具体保存多久,要看你kafka的全局配置或者特定topic的配置

















At 2020-12-09 20:24:17, "邮件帮助中心"  wrote:
>


flink 1.11 版本sql client 不支持checkpoint

2020-12-15 文章 lingchanhu
在官网文档中看到在代码中对于开启checkpoint配置,但是sql client 的相关文档没有checkpoint的描述,是不支持么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

UDF函数类型匹配问题

2020-12-15 文章 chen310
flink版本是1.11.1

创建了一个udf函数如下


public class FromUnixTime extends ScalarFunction {

  private static final Logger logger =
LoggerFactory.getLogger(FromUnixTime.class);

  public String eval(long unixTime, String timeZone, String format) {
try {
  DateFormat dateFormat = new SimpleDateFormat(format);
  dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
  return dateFormat.format(new Date(unixTime));
} catch (Exception e) {
  logger.error("parse unixTime error, unixTime: " + unixTime +
",timeZone: " + timeZone, ",format:" + format, e);
  return null;
}
  }

  public String eval(long unixTime, String timeZone) {
return eval(unixTime, timeZone, "-MM-dd HH:mm:ss");
  }
}



测试执行如下语句是能正常成功的。


 


但是创建view报如下错误,是啥原因?

create view xx as
select 
  *,
UDF_FromUnixTime(exeAt, 'utc', '-MM-ddHH:mm' ) as biz_min
from timeout_topic_source;

其中exeAt是个long类型的时间

错误如下:

org.apache.flink.table.api.ValidationException: SQL validation failed.
Invalid function call:
default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL,
CHAR(15) NOT NULL)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:148)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCreateView(FlinkSqlInterrpeter.java:343)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:267)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:145)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:114)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function
call:
default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL,
CHAR(15) NOT NULL)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:207)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:93)
at
org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:684)
at
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:448)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:314)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at

Re: pyflink 定义udf 运行报错

2020-12-15 文章 Xingbo Huang
Hi,

因为你没有提供详细的作业信息,单看报错可以看到是使用的Python UDF抛出来的,更具体点是你Python
UDF返回的字符串结果在java端反序列的时候失败了,你可以检查一下你对应的Python UDF

Best,
Xingbo

Leopard  于2020年12月16日周三 上午9:42写道:

> pyflink 1.11.1
>
> Fail to run sql command: SELECT
> driverStatus,userId,latitude,locTime,longitude,city_code,ad_code
> ,geo_to_h3(latitude,longitude,7) as
> h3_hash,geo_to_numpy_int_h3(latitude,longitude,7) as h3_code
> FROM lbs_trace CROSS JOIN UNNEST(datas),lateral
> table(split_json(expandInfo)) as T(city_code,ad_code)
> java.io.IOException: Fail to run stream sql job
> at
>
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
> at
>
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
> at
>
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111)
> at
>
> org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:846)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:738)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
> at
>
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
> at
>
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: aa71b252e058bf6b0f5ec15b23d86adc)
> at
>
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
> at
>
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
> at
>
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161)
> ... 16 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: aa71b252e058bf6b0f5ec15b23d86adc)
> at
>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at
>
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> at
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
> at
>
> 

Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-15 文章 Xingbo Huang
Hi,

默认就是你每台机器的python指向的环境下,当然你也可以通过-pyexec指定不同的python环境

Best,
Xingbo

magichuang  于2020年12月15日周二 下午8:02写道:

> 我现在看看那个报错,flink是把requirements.txt  和  cached_dir  已经先上传到hdfs上了,因为
> /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt
>
>
>  
> /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir
> 在提交的时候  去看机器上是存在的,只不过等程序挂了,这个
> /yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。
>
> 现在想请教一下,flink在引入外部  python依赖时,在从离线包里面安装库的时候是安装到了哪里?
>
>
>
>
> 我看报错信息:  Error [Errno 13] Permission denied: '' while executing command
> python setup.py egg_info
>
> 因为它是在  python setup.py  的时候报的权限问题
>
>
>
>
> 求大家给看看~~感谢
>
>
>
>
> -- 原始邮件 --
>
> 发 件 人:magichuang 
>
> 发送时间:2020-12-15 14:15:04
>
> 收 件 人:user-zh 
>
> 抄 送:
>
> 主 题:pyflink 引用第三库的文件出现安装权限的问题
>
>
>
>
> 请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错
>
> flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器
>
>
>
>
> 提交命令:flink run -m yarn-cluster -ynm demo  -ys 2 -ytm 2048 -p 2 -py demo.py
>
>
>
>
> 代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png
>
>
>
>
> 报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png
>
>
>
>
> requestments.txt:IPy==1.0cache_dir:  IPy-1.00.tar.gz
>
>
>
>
> 自定义udf代码:
>
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>
> def judge_ip(ip):
>
> import IPy
>
> if ip in IPy.IP('192.168.112.0/28'):
>
> return 'in'
>
> return 'out'
>
>
>
>
>
>
>
> 祝好~
>
>
>


使用flinksql提供的内置函数LAST_VALUE 发现存的state越来越大

2020-12-15 文章 guoliang_wang1335
使用flink1.10。。开启了mini-batch和设置了idleStateRetentionTime,执行中会进行left 
join操作,最后sink的时候insert into table select a, LAST_VALUE(b) group by a;
有关配置如下:
val tConfig: TableConfig = tEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
val configuration = tEnv.getConfig().getConfiguration()
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")

configuration.setString("table.exec.mini-batch.size", "5000")


发现checkpointsize越来越大,设置的IdleStateRetentionTime对LAST_VALUE状态清理无效。

请问,有人遇到过这个问题吗?如何解决呢?



flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-15 文章 bradyMk
Hi~想请教一下大家:

最近使用flink1.9版本用RocksDB做增量ck,我想配置如下两个内容的指标来监控任务的内存情况:
  ①block-cache-usage
  ②write buffer

但是在官网[1]并没有找到相关指标,通过查阅资料得知:
  write buffer对应的指标为:state.backend.rocksdb.metrics.cur-size-all-mem-tables
  而block-cache-usage的指标是1.10版本之后才有的,1.9版本没有这个指标;

问:
①write buffer是否对应这个指标 ->
state.backend.rocksdb.metrics.cur-size-all-mem-tables
②如果1.9没有监控block-cache-usage的直接指标,那么该如何监控block-cache-usage呢?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics

  






-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 1.12 写数据到elasticsearch,部署问题

2020-12-15 文章 Yangze Guo
需要放 flink-sql-connector-elasticsearch7_2.11-1.12.0.jar

Best,
Yangze Guo

On Wed, Dec 16, 2020 at 11:34 AM cljb...@163.com  wrote:
>
> hi,
> flink sql 1.12版本,写数据到elasticsearch时,本地执行正常,部署到服务器上,报如下错误。
> 检查了打的jar包,里面是包含相应的类的,在flink 
> lib下也已经放了flink-connector-elasticsearch7_2.11-1.12.0.jar 包。
> 调整了类的加载,试了child-first和parent-first都不行
> 有遇到类似问题的吗?
> 谢谢!
>
> 错误提示如下:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Could not load service provider for table factories.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1685)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> Caused by: org.apache.flink.table.api.TableException: Could not load service 
> provider for table factories.
> at 
> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:218)
> at 
> org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
> at 
> org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
> at 
> org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:263)
> at 
> org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:90)
> at com.searchrec.main.XfkEsIndex.main(XfkEsIndex.java:24)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> ... 11 more
> Caused by: java.util.ServiceConfigurationError: 
> org.apache.flink.table.factories.TableFactory: Provider 
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
>  could not be instantiate
> d at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at 
> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
> ... 22 more
> Caused by: java.lang.NoClassDefFoundError: 
> org/elasticsearch/common/xcontent/XContentType
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.(ElasticsearchUpsertTableSinkFactoryBase.java:105)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 26 more
> Caused by: java.lang.ClassNotFoundException: 
> org.elasticsearch.common.xcontent.XContentType
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 33 more
>
>
>
>
> cljb...@163.com


Re: flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?

2020-12-15 文章 Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Wed, Dec 16, 2020 at 11:34 AM 李世钰  wrote:
>
> flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?
> elasticsearch7.0
>
>
>
>
>
>
> --
>
> --
>
> 李世钰
>
> Mail:m...@lishiyu.cn
>
> TEL:18801236165
>
> Motto:让身边的人快乐,你的身边就充满快乐!
>
>
>
>
>
>
> 
>
>
>
> 


Re: 使用flinksql提供的内置函数LAST_VALUE 发现存的state越来越大

2020-12-15 文章 guoliang_wang1335
补充下具体设置:
 使用flink1.10。。开启了mini-batch和设置了idleStateRetentionTime,在最后sink的时候insert 
into table select a, LAST_VALUE(b) group by a; 有关配置如下:
val tConfig: TableConfig = tEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
val configuration = tEnv.getConfig().getConfiguration()
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
发现状态越来越大。


请问,有人遇到过这个问题吗?














在 2020-12-11 16:06:50,"guoliang_wang1335"  写道:
>  Hi,
>   我设置了状态过期,如下。使用内置函数LAST_VALUE后,观察lastCheckpointSize大小,发现其越来越大。 
> 。请问,下面的设置对LAST_VALUE会生效吗?我应该怎么处理呢?感谢!
>TableConfigtConfig=tableEnv.getConfig();
>tConfig.setIdleStateRetentionTime(Time.hours(12),Time.hours(24));


Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

2020-12-15 文章 赵一旦
为了方便描述,重新给出了完整SQL,以及部分分析到如下地址。

https://www.yuque.com/sixhours-gid0m/eegye3/xrz2mm

欢迎大家帮忙解答。

赵一旦  于2020年12月16日周三 上午10:52写道:

>
> 从这2个方案的source结点来看没有太大区别。但问题在于,我从web-ui的metric标签查看outputwatermark的时候。发现方案2中0号并行实例存在8个带有outputwatermark的指标(1个source开头,7个calc开头)。方案3中则只有2个。
>
> 赵一旦  于2020年12月16日周三 上午10:41写道:
>
>> 有没有人懂啊。今天的新发现如下。
>> 我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。
>> 方案2:
>>
>> Source: TableSourceScan(table=[[default_catalog, default_database, 
>> baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), 
>> CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 6:INTERVAL SECOND)]]], 
>> fields=[cid, server_time, d])
>>  -> (
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77') IS 
>> NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
>> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
>> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79') IS 
>> NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
>> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
>> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80') IS 
>> NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
>> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
>> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81') IS 
>> NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
>> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
>> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83') IS 
>> NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
>> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
>> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84') IS 
>> NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
>> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
>> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86') IS 
>> NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
>> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
>> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time])
>>  )
>>
>> 方案3:
>>
>> Source: TableSourceScan(table=[[default_catalog, default_database, dr1, 
>> watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), 
>> CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 6:INTERVAL SECOND)]]], 
>> fields=[cid, server_time, d])
>>  -> (
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
>> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
>> _UTF-16LE'106') AS su
>>
>> pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, 
>> ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'107')) CASE 
>> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
>> field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
>> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
>> _UTF-16LE'106') AS supply_id, _UTF-16LE'd77':VARCHAR(4) CHARACTER SET 
>> "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d 
>> ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET 
>> "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
>> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
>> _UTF-16LE'106') AS supply_id, _UTF-16LE'd79':VARCHAR(4) CHARACTER SET 
>> "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d 
>> ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET 
>> "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
>> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
>> _UTF-16LE'106') AS supply_id, _UTF-16LE'd80':VARCHAR(4) CHARACTER SET 
>> "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d 
>> ITEM _UTF-16LE'80')) 

flink sql 1.12 写数据到elasticsearch,部署问题

2020-12-15 文章 cljb...@163.com
hi,
flink sql 1.12版本,写数据到elasticsearch时,本地执行正常,部署到服务器上,报如下错误。
检查了打的jar包,里面是包含相应的类的,在flink 
lib下也已经放了flink-connector-elasticsearch7_2.11-1.12.0.jar 包。
调整了类的加载,试了child-first和parent-first都不行
有遇到类似问题的吗?
谢谢!

错误提示如下:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not load service provider for table factories.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1685)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.table.api.TableException: Could not load service 
provider for table factories.
at 
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:218)
at 
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
at 
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
at 
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:263)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:90)
at com.searchrec.main.XfkEsIndex.main(XfkEsIndex.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
... 11 more
Caused by: java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.TableFactory: Provider 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
 could not be instantiate
d at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at 
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
... 22 more
Caused by: java.lang.NoClassDefFoundError: 
org/elasticsearch/common/xcontent/XContentType
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.(ElasticsearchUpsertTableSinkFactoryBase.java:105)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 26 more
Caused by: java.lang.ClassNotFoundException: 
org.elasticsearch.common.xcontent.XContentType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 33 more




cljb...@163.com


flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?

2020-12-15 文章 李世钰
flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?
elasticsearch7.0






--

--

李世钰

Mail:m...@lishiyu.cn

TEL:18801236165

Motto:让身边的人快乐,你的身边就充满快乐!








 



Re: flink1.9.1单任务配置rocksDB不生效

2020-12-15 文章 bradyMk
Hi~谢谢解答~

我去查看了下TM的日志,发现的确是启动了rocksDB状态后端;
可是为什么在web ui 中 Job Manager --> Configuration 中
state.backend还是显示的是:filesystem呢?
不应该是:RocksDB 么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.12 docker 镜像啥时候有

2020-12-15 文章 Yang Wang
目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
具体你可以跟进一下这个PR https://github.com/docker-library/official-images/pull/9249

当前你也可以自己build一个镜像来使用,方法如下:

git clone https://github.com/apache/flink-docker.git
git checkout dev-master./add-custom.sh -u
https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
-n flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
flink:flink-1.12.0docker push flink:flink-1.12.0



jiangjiguang719  于2020年12月9日周三 下午5:09写道:

> 请问啥时候 在docker hub中可以看到1.12版本的镜像?


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-15 文章 Yang Wang
以flink-shaded-hadoop-2-uber的2.8.3-10.0为例

2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本

社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3

如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0


[1]. https://github.com/apache/flink-shaded
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation

Best,
Yang

赢峰  于2020年12月11日周五 上午8:45写道:

> flink-shaded-hadoop-2-uber版本如何选择?
>
>
> xxx-xxx 分别表示什么意思?
>
>
>


Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

2020-12-15 文章 赵一旦
从这2个方案的source结点来看没有太大区别。但问题在于,我从web-ui的metric标签查看outputwatermark的时候。发现方案2中0号并行实例存在8个带有outputwatermark的指标(1个source开头,7个calc开头)。方案3中则只有2个。

赵一旦  于2020年12月16日周三 上午10:41写道:

> 有没有人懂啊。今天的新发现如下。
> 我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。
> 方案2:
>
> Source: TableSourceScan(table=[[default_catalog, default_database, baidu_log, 
> watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), 
> CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 6:INTERVAL SECOND)]]], 
> fields=[cid, server_time, d])
>  -> (
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77') IS NOT 
> NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79') IS NOT 
> NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80') IS NOT 
> NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81') IS NOT 
> NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83') IS NOT 
> NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84') IS NOT 
> NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86') IS NOT 
> NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, 
> Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE 
> CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time])
>  )
>
> 方案3:
>
> Source: TableSourceScan(table=[[default_catalog, default_database, dr1, 
> watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), 
> CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 6:INTERVAL SECOND)]]], 
> fields=[cid, server_time, d])
>  -> (
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
> _UTF-16LE'106') AS su
>
> pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, 
> ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'107')) CASE 
> _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
> field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
> _UTF-16LE'106') AS supply_id, _UTF-16LE'd77':VARCHAR(4) CHARACTER SET 
> "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d 
> ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
> _UTF-16LE'106') AS supply_id, _UTF-16LE'd79':VARCHAR(4) CHARACTER SET 
> "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d 
> ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time, (d ITEM 
> _UTF-16LE'106') AS supply_id, _UTF-16LE'd80':VARCHAR(4) CHARACTER SET 
> "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d 
> ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT 
> NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS 

Re: Flink1.11.1版本Application Mode job on K8S集群,too old resource version问题

2020-12-15 文章 Yang Wang
我之间建了一个JIRA来跟进too old resource version的问题[1]

目前在Flink里面采用了Watcher来监控Pod的状态变化,当Watcher被异常close的时候就会触发fatal
error进而导致JobManager的重启

我这边做过一些具体的测试,在minikube、自建的K8s集群、阿里云ACK集群,稳定运行一周以上都是正常的。这个问题复现是通过重启
K8s的APIServer来做到的。所以我怀疑你那边Pod和APIServer之间的网络是不是不稳定,从而导致这个问题经常出现。


[1]. https://issues.apache.org/jira/browse/FLINK-20417

Best,
Yang

lichunguang  于2020年12月15日周二 上午9:25写道:

> Flink1.11.1版本job以Application Mode在K8S集群上运行,jobmanager每个小时会重启一次,报错【Fatal
> error
> occurred in
> ResourceManager.io.fabric8.kubernetes.client.KubernetesClientException: too
> old resource version】
>
> pod重启:
> 
>
> 重启原因:
> 2020-12-10 07:21:19,290 ERROR
> org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
> error occurred in ResourceManager.
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 247468999 (248117930)
>   at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
> 2020-12-10 07:21:19,291 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 247468999 (248117930)
>   at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
>
>
> 网上查的原因是因为:
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient类中212行
>
> @Override
> public KubernetesWatch watchPodsAndDoCallback(Map labels,
> PodCallbackHandler podCallbackHandler) {
> return new KubernetesWatch(
> this.internalClient.pods()
> .withLabels(labels)
> .watch(new
> KubernetesPodsWatcher(podCallbackHandler)));
> }
>
> 而ETCD中只会保留一段时间的version信息
> 【 I think it's standard behavior of Kubernetes to give 410 after some time
> during watch. It's usually client's responsibility to handle it. In the
> context of a watch, it will return HTTP_GONE when you ask to see changes
> for
> a resourceVersion that is too old - i.e. when it can no longer tell you
> what
> has changed since that 

Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

2020-12-15 文章 赵一旦
有没有人懂啊。今天的新发现如下。
我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。
方案2:
Source: TableSourceScan(table=[[default_catalog, default_database,
baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT
NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 6:INTERVAL
SECOND)]]], fields=[cid, server_time, d])
 -> (
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000 AS event_time])
 )

方案3:
Source: TableSourceScan(table=[[default_catalog, default_database,
dr1, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1),
CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 6:INTERVAL
SECOND)]]], fields=[cid, server_time, d])
 -> (
  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS
event_time, (d ITEM _UTF-16LE'106') AS su

pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS
field_key, ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM
_UTF-16LE'107')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE") AS field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd77':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd79':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd80':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd81':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000 AS

Re: Flink 1.10.0 on yarn 提交job失败

2020-12-15 文章 Xintong Song
看起来是 Yarn 没有给应用设置 hadoop classpath。可以登机器确认一下 launch_container.sh
的内容,container 启动命令里是否包含了正确的 hadoop classpath。Yarn 是定制过的版本吗?按理说开源版本都会给
container 设置 hadoop classpath 的。

1.10 版本以前可以运行是因为 flink 自带了 shaded hadoop,从 1.10 版本开始 flink 默认不再携带 shaded
hadoop,而是使用集群环境的 hadoop 依赖。你也可以自己携带 shaded hadoop[1],应该也可以运行。

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#adding-hadoop-to-lib

On Sat, Dec 12, 2020 at 8:05 PM Jacob <17691150...@163.com> wrote:

> Hello, 请问在flink 1.10.0 on yarn提交job出现此问题是什么原因,hadoop
> jar包依赖吗?该程序在1.10以下的版本均可运行,在1.10.0无法提交。
>
> 谢谢!
> 
>
> [jacob@hadoop001 bin]$ ./yarn logs -applicationId
> application_1603495749855_57650
> 20/12/11 18:52:55 INFO client.RMProxy: Connecting to ResourceManager at
> localhost:8032
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/opt/app/hadoop_client/e11_backend/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/app/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 20/12/11 18:52:57 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
>
> Container: container_1603495749855_57650_02_01 on localhost
>
> =
> LogType:jobmanager.err
> Log Upload Time:Fri Dec 11 18:49:21 -0800 2020
> LogLength:2368
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/11/datafeed-website-filter_flink-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/17/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
> Exception in thread "main" java.lang.NoSuchMethodError:
>
> org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
> at
>
> org.apache.hadoop.mapreduce.util.ConfigUtil.addDeprecatedKeys(ConfigUtil.java:54)
> at
>
> org.apache.hadoop.mapreduce.util.ConfigUtil.loadResources(ConfigUtil.java:42)
> at org.apache.hadoop.mapred.JobConf.(JobConf.java:119)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
>
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1659)
> at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:91)
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
> at
>
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
> at org.apache.hadoop.security.Groups.(Groups.java:55)
> at
>
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:182)
> at
>
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:235)
> at
>
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214)
> at
>
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669)
> at
>
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571)
> at
>
> org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
> at
>
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink sql数据处理时延的测试方法?

2020-12-15 文章 jindy_liu
请问下,在flink sql里,一般用啥方法去衡量一个任务里,一条mysql
cdc流从输入flink,走完flink内部算子,到sink外部系统的整体时延? 或者说整个任务的时延?
总说是实时,目前也不知道处理的实时的量级!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

加锁两次

2020-12-15 文章 xiaobao li
org.apache.flink.runtime.entrypoint.ClusterEntrypoint的runCluster方法已经加锁了,synchronized
(lock)锁住了整个方法

那么在这个方法内部又调用了initializeServices方法,这个方法里面又有synchronized (lock),加锁两次有啥意义吗?


Re: Application Mode job on K8S集群,无法缩容问题

2020-12-15 文章 Yang Wang
没有特别理解清楚你的意思,你是希望让K8s来完成自动伸缩?

Native Flink on K8s的基本理念是由Flink的ResourceManager向K8s主动申请/释放TaskManager Pod,
如果一个TaskManager处于idle状态,那就会被自动释放。这样是更加灵活的,TaskManager的生命周期
交给Flink ResourceManager来进行控制。而且以后可能会有TaskManager是不同规格的情况,所以直接
用Pod会更加合适一些。

你如果是想要根据cpu、内存负载来调整TM数量,那可以使用Standalone Flink on K8s[2]的方式。这种情况
下TaskManager是用Deployment来管理,替换成StatefulSet也是可以的。

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/kubernetes.html

Best,
Yang


lichunguang  于2020年12月15日周二 下午9:44写道:

> 现在这种模式,可以实现每个pod可以配置不同的配置,如CPU、MEM。
> 但是整体的资源配置是相同的,是否使用statefulset模式启动TM pod更合适呢?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12提交用户任务方法问题

2020-12-15 文章 Yang Wang
org.apache.flink.client.ClientUtils#submitJob这个方法不是给用户直接来使用的,所以重构的过程中可能会被移除掉

建议你使用org.apache.flink.client.program.rest.RestClusterClient#submitJob进行代替

Best,
Yang

陈帅  于2020年12月15日周二 下午8:28写道:

> 请问 flink 1.11 版本下 用于提交用户任务的方法
> org.apache.flink.client.ClientUtils.submitJob(ClusterClient, JobGraph)方法在
> flink 1.12版本下找不到了,
> 现在用哪个方法取代了呢?又该如何获取提交任务后的jobId呢?谢谢!
>
>
> | publicstaticJobExecutionResultsubmitJob( |
> | | ClusterClientclient, |
> | | JobGraphjobGraph) throwsProgramInvocationException { |
> | | checkNotNull(client); |
> | | checkNotNull(jobGraph); |
> | | try { |
> | | return client |
> | | .submitJob(jobGraph) |
> | | .thenApply(DetachedJobExecutionResult::new) |
> | | .get(); |
> | | } catch (InterruptedException|ExecutionException e) { |
> | | ExceptionUtils.checkInterrupted(e); |
> | | thrownewProgramInvocationException("Could not run job in detached
> mode.", jobGraph.getJobID(), e); |
> | | } |
> | | } |
>
>
>
> https://github.com/apache/flink/blob/release-1.11/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java#L77
>
> https://github.com/apache/flink/blob/release-1.12/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java


Re: 转发: 两条流去重后再关联出现不符合预期数据

2020-12-15 文章 hdxg1101300...@163.com
我是这样想的,因为最后的两条流关联是 两条结果流的关联,两条结果流 
都属于回撤流,任何一边变化都是2条消息;对于左侧第一条就是回撤,第二条就是变化后的;但是右边发生变化 则会有两条数据,false消息 和左边关联 
认为变化整个流表示变化回撤再显示关联后的数据;true数据来了再次关联 认为整个流变化;撤回再关联发出;
我的想法是可不可以 之和右边流为true的数据关联;



hdxg1101300...@163.com
 
发件人: hdxg1101300123
发送时间: 2020-12-15 23:44
收件人: user-zh
主题: 转发: 两条流去重后再关联出现不符合预期数据
-- 转发的邮件 --
发件人:hdxg1101300123 
日期:2020年12月15日 10:36
主题:两条流去重后再关联出现不符合预期数据
收件人:user-zh 
抄送:
 
> 你好: 
> 我在使用flink 
> 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 
> (SELECT [column_list] FROM ( 
>SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>ORDER BY time_attr [asc|desc]) AS rownum
>FROM table_name) 
> WHERE rownum = 1) 
> 去重后再左关联; 
> 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; 
> 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000)
>  
>
> 第1行左流来了数据显示true,此时右流没有数据结果是null; 
> 第2行右流来了数据,显示为true(单独打印了右流的结果); 
> 第3行显示左流撤回; 
> 第4行 左右流数据关联上,正常显示; 
> 第5行 左流数据变更,数据撤回; 
> 第6行 显示变更后的数据; 
> 第7行 右流数据变化,数据撤回; 
> 第8行 显示右流最新的结果; 
> 第9行 因为右流数据变化 所以左流(关联数据)撤回; 
> 第10行 和第11 行 不符合预期; 
> 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; 
> 所以想请教一下大家; 
>
> 1607998361520> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> 1607998361520> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
>
> 我的sql语句如下 
> String sql = "SELECT a.sheetId 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
> " 
> sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime
>  " + 
> " from (SELECT 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
> " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" 
> + 
> " FROM (SELECT *," + 
> " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
> "   FROM sheetMain)" + 
> " WHERE rownum = 1 ) a" + 
> " left JOIN " + 
> " (select sheetId,provided,satisfied,score,operateTime from (SELECT 
> *," + 
> " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
> "   FROM sheetAnswers)" + 
> " WHERE rownum = 1 ) c" + 
> " ON a.sheetId = c.sheetId " ; 
>
>
>
> hdxg1101300...@163.com 


回复: temparol table join后无法sink

2020-12-15 文章 guoliubi...@foxmail.com
找到原因了,数据问题,两个kafka 
source的earliest的数据timestamp差距比较大,导致在join时一直在堆积数据等待另一个队列的时间戳到达。调整offset让两个队列的时间戳一致后问题消失。



guoliubi...@foxmail.com
 
发件人: guoliubi...@foxmail.com
发送时间: 2020-12-16 07:36
收件人: user-zh
主题: temparol table join后无法sink
Hi,
 
流程是从两个kafka队列中取数据,做完temparol table join后取滚动窗口做UDAF,然后sink,代码大概如下
joined_table = t_env.sql_query("""
SELECT 
o.exchangeCode_ as code,
o.price,
o.volume,
o.eventTime
FROM orders AS o INNER JOIN quotes FOR SYSTEM_TIME AS OF o.eventTime q
ON o.exchangeCode_ = q.exchangeCode_
""")
tumble_window = Tumble.over(expr.lit(500).millis) \
.on(expr.col("eventTime")) \
.alias("w")
aggregate_table = joined_table.window(tumble_window) \
.group_by("w") \
.select("orderCalc(code, price, volume) as aggValue") \
.execute_insert("kafkaSink")
然后执行的时候数据都堆积在TemporalJoin环节,没法进入sink环节。执行图如下
https://ftp.bmp.ovh/imgs/2020/12/702ccb600bb01968.png
最后sink环节的bytes received一直是0,然后运行到最后就因为内存不足失败。
看了taskmanager的日志里面没有报错。
 
想问下这种问题应该从哪里进行排查,多谢。
 
 
guoliubi...@foxmail.com


Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-15 文章 Yang Wang
你得确认hadoop classpath返回的是完整的,正常情况下hadoop classpath这个命令会把所有的hadoop jar都包含进去的
如果报类或者方法不存在需要确认相应的jar是否存在,并且包含进去了

社区推荐hadoop classpath的方式主要是想让Flink做到hadoop free,这样在hadoop2和hadoop3都可以正常运行了

Best,
Yang

Jacob <17691150...@163.com> 于2020年12月15日周二 上午9:25写道:

> 谢谢回复!
>
> 这个文档我也有查看
>
> 前几日在flink1.9-1.12各个客户端测试提交job时候发现
> 对于1.10+的版本,我手动导入export HADOOP_CLASSPATH=`hadoop
>
> classpath`,没有效果,各种报错,基本都是Hadoop相关类、方法不存在(NoSuchMethod之类错误),把pom文件改来改去依然无用,后来只在pom文件中导入依赖:flink-shaded-hadoop-2-uber*-*,竟然可以正常提交并运行job了。
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


pyflink 定义udf 运行报错

2020-12-15 文章 Leopard
pyflink 1.11.1

Fail to run sql command: SELECT 
driverStatus,userId,latitude,locTime,longitude,city_code,ad_code
,geo_to_h3(latitude,longitude,7) as
h3_hash,geo_to_numpy_int_h3(latitude,longitude,7) as h3_code
FROM lbs_trace CROSS JOIN UNNEST(datas),lateral
table(split_json(expandInfo)) as T(city_code,ad_code)
java.io.IOException: Fail to run stream sql job
at
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
at
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
at
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111)
at
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:846)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:738)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: aa71b252e058bf6b0f5ec15b23d86adc)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
at
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161)
... 16 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: aa71b252e058bf6b0f5ec15b23d86adc)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
... 3 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114)
... 19 more
Caused by: 

Flink 1.12.0 写ORC文件,自定义文件名

2020-12-15 文章 Jacob
目前,Flink在写ORC文件时候,可通过OutputFileConfig类配置文件的前缀后缀:.withPartPrefix("prefix")、.withPartSuffix(".ext")

生成的文件格式为:part--

有没有可以完全自定义生成的文件名,比如:"dt=1608006781874",dt=时间戳的形式生成文件,目的是可以直接作为分区load在hive表。后期容易操作hive表。如果是flink默认的文件格式无法load在hive表。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 邮件退订

2020-12-15 文章 Evan
你好,退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考 [1] 
https://flink.apache.org/community.html#mailing-lists



 
发件人: 谢治平
发送时间: 2020-12-16 09:08
收件人: user-zh
主题: 邮件退订
您好,邮件退订一下 


邮件退订

2020-12-15 文章 谢治平
您好,邮件退订一下

temparol table join后无法sink

2020-12-15 文章 guoliubi...@foxmail.com
Hi,

流程是从两个kafka队列中取数据,做完temparol table join后取滚动窗口做UDAF,然后sink,代码大概如下
joined_table = t_env.sql_query("""
SELECT 
o.exchangeCode_ as code,
o.price,
o.volume,
o.eventTime
FROM orders AS o INNER JOIN quotes FOR SYSTEM_TIME AS OF o.eventTime q
ON o.exchangeCode_ = q.exchangeCode_
""")
tumble_window = Tumble.over(expr.lit(500).millis) \
.on(expr.col("eventTime")) \
.alias("w")
aggregate_table = joined_table.window(tumble_window) \
.group_by("w") \
.select("orderCalc(code, price, volume) as aggValue") \
.execute_insert("kafkaSink")
然后执行的时候数据都堆积在TemporalJoin环节,没法进入sink环节。执行图如下
https://ftp.bmp.ovh/imgs/2020/12/702ccb600bb01968.png
最后sink环节的bytes received一直是0,然后运行到最后就因为内存不足失败。
看了taskmanager的日志里面没有报错。

想问下这种问题应该从哪里进行排查,多谢。


guoliubi...@foxmail.com


转发: 两条流去重后再关联出现不符合预期数据

2020-12-15 文章 hdxg1101300123
-- 转发的邮件 --
发件人:hdxg1101300123 
日期:2020年12月15日 10:36
主题:两条流去重后再关联出现不符合预期数据
收件人:user-zh 
抄送:

> 你好: 
>     我在使用flink 
> 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 
> (SELECT [column_list] FROM ( 
>    SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>    ORDER BY time_attr [asc|desc]) AS rownum
>    FROM table_name) 
> WHERE rownum = 1) 
> 去重后再左关联; 
> 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; 
>     
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
>  15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279) 
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,160793279) 
> right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) 
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,160793279)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> left> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000)
>  
>
> 第1行左流来了数据显示true,此时右流没有数据结果是null; 
> 第2行右流来了数据,显示为true(单独打印了右流的结果); 
> 第3行显示左流撤回; 
> 第4行 左右流数据关联上,正常显示; 
> 第5行 左流数据变更,数据撤回; 
> 第6行 显示变更后的数据; 
> 第7行 右流数据变化,数据撤回; 
> 第8行 显示右流最新的结果; 
> 第9行 因为右流数据变化 所以左流(关联数据)撤回; 
> 第10行 和第11 行 不符合预期; 
> 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; 
> 所以想请教一下大家; 
>
> 1607998361520> 
> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
> 1607998361520> 
> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
>  
>
> 我的sql语句如下 
> String sql = "SELECT a.sheetId 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
>     " 
> sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime
>  " + 
>     " from (SELECT 
> sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + 
>     " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" 
> + 
>     " FROM (SELECT *," + 
>     " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
>     "   FROM sheetMain)" + 
>     " WHERE rownum = 1 ) a" + 
>     " left JOIN " + 
>     " (select sheetId,provided,satisfied,score,operateTime from (SELECT 
> *," + 
>     " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
> desc) AS rownum " + 
>     "   FROM sheetAnswers)" + 
>     " WHERE rownum = 1 ) c" + 
>     " ON a.sheetId = c.sheetId " ; 
>
>
>
> hdxg1101300...@163.com 


Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

2020-12-15 文章 赵一旦
方案2没问题,方案3的window算子部分没有watermark。

赵一旦  于2020年12月15日周二 下午10:49写道:

> 具体SQL如下。
> 方案2:
>
>
> INSERT INTO flink_sdk_stats
> (
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> sid   
>  AS `supply_id`,
> 'd77' 
>  AS `field_key`,
> d77   
>  AS `filed_value`,
> count(1)  
>  AS `pv`
> FROM
> baidu_log_view
> GROUP BY
> sid,
> d77,
> TUMBLE(event_time, INTERVAL '5' MINUTE)
>
> UNION ALL
>
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> sid   
>  AS `supply_id`,
> 'd79' 
>  AS `field_key`,
> d79   
>  AS `filed_value`,
> count(1)  
>  AS `pv`
> FROM
> baidu_log_view
> GROUP BY
> sid,
> d79,
> TUMBLE(event_time, INTERVAL '5' MINUTE)
>
> UNION ALL
>
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> sid   
>  AS `supply_id`,
> 'd80' 
>  AS `field_key`,
> d80   
>  AS `filed_value`,
> count(1)  
>  AS `pv`
> FROM
> baidu_log_view
> GROUP BY
> sid,
> d80,
> TUMBLE(event_time, INTERVAL '5' MINUTE)
>
> UNION ALL
>
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> sid   
>  AS `supply_id`,
> 'd81' 
>  AS `field_key`,
> d81   
>  AS `filed_value`,
> count(1)  
>  AS `pv`
> FROM
> baidu_log_view
> GROUP BY
> sid,
> d81,
> TUMBLE(event_time, INTERVAL '5' MINUTE)
>
> UNION ALL
>
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> sid   
>  AS `supply_id`,
> 'd83' 
>  AS `field_key`,
> d83   
>  AS `filed_value`,
> count(1)  
>  AS `pv`
> FROM
> baidu_log_view
> GROUP BY
> sid,
> d83,
> TUMBLE(event_time, INTERVAL '5' MINUTE)
>
> UNION ALL
>
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> sid   
>  AS `supply_id`,
> 'd84' 
>  AS `field_key`,
> d84   
>  AS `filed_value`,
> count(1)  
>  AS `pv`
> FROM
> baidu_log_view
> GROUP BY
> sid,
> d84,
> TUMBLE(event_time, INTERVAL '5' MINUTE)
>
> UNION ALL
>
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> sid   
>  AS `supply_id`,
> 'd86' 
>  AS `field_key`,
> d86   
>  AS `field_value`,
> count(1)  
>  AS `pv`
> FROM
> baidu_log_view
> GROUP BY
> sid,
> d86,
> TUMBLE(event_time, INTERVAL '5' MINUTE)
> );
>
>
>
> 方案3:
>
>
> INSERT INTO flink_sdk_stats
> SELECT
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') AS `time`,
> `supply_id`,
> 

回复:Flink sql 列裁剪原理请教

2020-12-15 文章 SmileSmile
hi,hailongwang
project_remove可以消掉两个链接在一起的projection,如果只投影一个字段,可是经过好几层sql嵌套,底层投影了大量的字段。如何做到更好的列裁剪,这块flink的相关实现是否有?


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年12月15日 22:33,hailongwang 写道:
Hi,
1. projection prune 可查看:CoreRules.PROJECT_REMOVE, 
FlinkLogicalCalcRemoveRule.INSTANCE
2. projection push into tablesource 可查看:PushProjectIntoTableSourceScanRule


Best,
Hailong


在 2020-12-15 20:57:32,"SmileSmile"  写道:
>hi,社区的各位,是否有了解flink sql的列裁剪的实现原理?
>
>通过calcite的rbo可以实现sql优化,calcite的coreRules好像没有实现列裁剪。看一些文章有提到flink有实现projection 
>pushdown。请问下这部分源码对应哪里
>
>Best!
>
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

2020-12-15 文章 赵一旦
具体SQL如下。
方案2:


INSERT INTO flink_sdk_stats
(
SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
sid
AS `supply_id`,
'd77'
AS `field_key`,
d77
AS `filed_value`,
count(1)
AS `pv`
FROM
baidu_log_view
GROUP BY
sid,
d77,
TUMBLE(event_time, INTERVAL '5' MINUTE)

UNION ALL

SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
sid
AS `supply_id`,
'd79'
AS `field_key`,
d79
AS `filed_value`,
count(1)
AS `pv`
FROM
baidu_log_view
GROUP BY
sid,
d79,
TUMBLE(event_time, INTERVAL '5' MINUTE)

UNION ALL

SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
sid
AS `supply_id`,
'd80'
AS `field_key`,
d80
AS `filed_value`,
count(1)
AS `pv`
FROM
baidu_log_view
GROUP BY
sid,
d80,
TUMBLE(event_time, INTERVAL '5' MINUTE)

UNION ALL

SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
sid
AS `supply_id`,
'd81'
AS `field_key`,
d81
AS `filed_value`,
count(1)
AS `pv`
FROM
baidu_log_view
GROUP BY
sid,
d81,
TUMBLE(event_time, INTERVAL '5' MINUTE)

UNION ALL

SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
sid
AS `supply_id`,
'd83'
AS `field_key`,
d83
AS `filed_value`,
count(1)
AS `pv`
FROM
baidu_log_view
GROUP BY
sid,
d83,
TUMBLE(event_time, INTERVAL '5' MINUTE)

UNION ALL

SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
sid
AS `supply_id`,
'd84'
AS `field_key`,
d84
AS `filed_value`,
count(1)
AS `pv`
FROM
baidu_log_view
GROUP BY
sid,
d84,
TUMBLE(event_time, INTERVAL '5' MINUTE)

UNION ALL

SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
sid
AS `supply_id`,
'd86'
AS `field_key`,
d86
AS `field_value`,
count(1)
AS `pv`
FROM
baidu_log_view
GROUP BY
sid,
d86,
TUMBLE(event_time, INTERVAL '5' MINUTE)
);



方案3:


INSERT INTO flink_sdk_stats
SELECT
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') AS `time`,
`supply_id`,
`field_key`,
`field_value`,
count(1) AS `pv`
FROM
(
 SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`,
d107 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77
 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77
 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd79'  AS `field_key`, d79
 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd80'  AS `field_key`, d80
 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd81'  AS `field_key`, d81
 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd83'  AS `field_key`, d83
 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd84'  AS `field_key`, d84
 AS `field_value` FROM baidu_log_view
 UNION ALL
 SELECT event_time, sid AS `supply_id`, 'd86'  AS `field_key`, d86
 AS `field_value` FROM baidu_log_view
)
GROUP BY
`supply_id`, `field_key`, `field_value`, TUMBLE(event_time,
INTERVAL '5' MINUTE);


赵一旦  于2020年12月15日周二 下午10:48写道:

>
> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
> (1)每组key分别统计,分别insert。
> (2)每组key分别统计,然后union结果,然后insert。
> (3)针对表多次select,然后union,然后再基于key统计,然后insert。
> 第三种方案中,会将ky1、ky2这几个不同的字段通过
>
> select 'ky1' as key_name, ky1 as key_value
> union
> select 'ky2' as key_name, ky2 as key_value
>
> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。
>
> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。
>
>
>
>


FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

2020-12-15 文章 赵一旦
需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
(1)每组key分别统计,分别insert。
(2)每组key分别统计,然后union结果,然后insert。
(3)针对表多次select,然后union,然后再基于key统计,然后insert。
第三种方案中,会将ky1、ky2这几个不同的字段通过

select 'ky1' as key_name, ky1 as key_value
union
select 'ky2' as key_name, ky2 as key_value

的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。

目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。


Re:Re: 关于 stream-stream Interval Join 的问题

2020-12-15 文章 hailongwang
Hi,


可以关注:
https://issues.apache.org/jira/browse/FLINK-20162
https://issues.apache.org/jira/browse/FLINK-20387


Best,
Hailong
在 2020-12-15 10:40:19,"赵一旦"  写道:
>补充,实际FROM_UNIXTIME应该返回 TIMESTAMP WITH LOCAL TIME ZONE
>这个类型。(然后FlinkSQL可以自己转为TIMESTAMP)。
>
>此外,关于分窗,除了offset这种显示的由用户来解决时区分窗以外。还可以通过支持 TIMESTAMP WITH LOCAL TIME ZONE
>类型作为 event time 实现,当然内部当然还是通过offset实现,只是FlinkSQL语法层可以基于支持 TIMESTAMP WITH
>LOCAL TIME ZONE 作为eventtime来实现这种效果。
>
>如上是个人观点哈。。。
>
>赵一旦  于2020年12月15日周二 上午11:29写道:
>
>> 这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream
>> API去对比的话,FlinkSQL的这种表现肯定是有问题的。
>>
>> 换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。
>>
>> 这本来是应该在window分窗处基于offset实现的功能。
>>
>> 赵一旦  于2020年12月15日周二 上午11:22写道:
>>
>>> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。
>>>
>>> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
>>> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。
>>>
>>> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time
>>> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。
>>>
>>>
>>> 
>>> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
>>> UTC+8
>>> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。
>>>
>>> 
>>> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
>>> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。
>>>
>>>
>>>
>>> macia kk  于2020年12月11日周五 下午3:04写道:
>>>
 你用的是哪个版本的Flink呢?
 -
 1.11.2

 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
 所以你的binlog是怎么读进来的呢?自定义的format?
 -
 ts 就是时间戳

 bsTableEnv.executeSql("""
   CREATE TABLE input_database (
 `table` STRING,
 `database` STRING,
 `data` ROW(
   reference_id STRING,
   transaction_sn STRING,
   transaction_type BIGINT,
   merchant_id BIGINT,
   transaction_id BIGINT,
   status BIGINT
  ),
 ts BIGINT,
 event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
 WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
  ) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'mytopic',
'connector.properties.bootstrap.servers' = '',
'format.type' = 'json'
  )
 )



 ```



 Benchao Li  于2020年12月10日周四 下午6:14写道:

 > 你用的是哪个版本的Flink呢?
 >
 > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
 > 所以你的binlog是怎么读进来的呢?自定义的format?
 >
 > macia kk  于2020年12月10日周四 上午1:06写道:
 >
 > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
 event_time
 > -
 > > INTERVAL 'x' HOUR
 > >
 > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
 > >
 > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
 > > 能够反推出来数据的 currentMaxTimestamp
 > >
 > > currentMaxTimestamp = watermark + maxOutOfOrderness
 > >
 > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
 > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
 > >
 > >
 > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
 > >
 > >
 >
 {"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":"
 > > *2020-12-10T01:02:24Z*"}
 > >
 > > UI 上显示的 watermark 是 1607555031000(Your time zone:
 2020年12月10日星期四早上7点02分
 > > GMT+08:00)
 > >
 > > 这个 watermark 是未来的时间 
 > >
 > >
 > >
 > >
 > >
 > > macia kk  于2020年12月9日周三 下午11:36写道:
 > >
 > > > 感谢 一旦 和 Benchao
 > > >
 > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
 上的数据,但是我
 > > Job
 > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
 > > >
 > > > val result = bsTableEnv.sqlQuery("""
 > > >SELECT *
 > > >FROM (
 > > >   SELECT t1.`table`, t1.`database`, t1.transaction_type,
 > > t1.transaction_id,
 > > > t1.reference_id, t1.transaction_sn, t1.merchant_id,
 > > t1.status, t1.event_time
 > > >   FROM main_db as t1
 > > >   LEFT JOIN main_db as t2
 > > >   ON t1.reference_id = t2.reference_id
 > > >   WHERE t1.event_time >= t2.event_time + INTERVAL '5'
 MINUTES
 > > >AND t1.event_time <= t2.event_time - INTERVAL '5'
 MINUTES
 > > >)
 > > >   """.stripMargin)
 > > >
 > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
 > > >
 > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
 > > > subtask的watermark。
 > > > 

Re:Flink sql 列裁剪原理请教

2020-12-15 文章 hailongwang
Hi,
1. projection prune 可查看:CoreRules.PROJECT_REMOVE, 
FlinkLogicalCalcRemoveRule.INSTANCE
2. projection push into tablesource 可查看:PushProjectIntoTableSourceScanRule


Best,
Hailong


在 2020-12-15 20:57:32,"SmileSmile"  写道:
>hi,社区的各位,是否有了解flink sql的列裁剪的实现原理?
>
>通过calcite的rbo可以实现sql优化,calcite的coreRules好像没有实现列裁剪。看一些文章有提到flink有实现projection 
>pushdown。请问下这部分源码对应哪里
>
>Best!
>
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re: zeppelin+flink1.12问题

2020-12-15 文章 Jeff Zhang
钉钉群里的新版本已经解决了,钉钉群号:32803524

赵一旦  于2020年12月15日周二 下午4:37写道:

> 如题,zeppelin+flink1.12报错(select 1)。
> org.apache.zeppelin.interpreter.InterpreterException:
> org.apache.zeppelin.interpreter.InterpreterException:
> java.lang.NoSuchMethodError:
>
> org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:760)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
> at
>
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
> at
>
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.zeppelin.interpreter.InterpreterException:
> java.lang.NoSuchMethodError:
>
> org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
> at
>
> org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:355)
> at
>
> org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:366)
> at
>
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:47)
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
> ... 8 more
> Caused by: java.lang.NoSuchMethodError:
>
> org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
> at
>
> org.apache.zeppelin.flink.FlinkScalaInterpreter.setTableEnvConfig(FlinkScalaInterpreter.scala:444)
> at
>
> org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:114)
> at
> org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:67)
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
> ... 12 more
>


-- 
Best Regards

Jeff Zhang


Flink sql 列裁剪原理请教

2020-12-15 文章 SmileSmile
hi,社区的各位,是否有了解flink sql的列裁剪的实现原理?

通过calcite的rbo可以实现sql优化,calcite的coreRules好像没有实现列裁剪。看一些文章有提到flink有实现projection 
pushdown。请问下这部分源码对应哪里

Best!


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re: Application Mode job on K8S集群,无法缩容问题

2020-12-15 文章 lichunguang
现在这种模式,可以实现每个pod可以配置不同的配置,如CPU、MEM。
但是整体的资源配置是相同的,是否使用statefulset模式启动TM pod更合适呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.12提交用户任务方法问题

2020-12-15 文章 陈帅
请问 flink 1.11 版本下 用于提交用户任务的方法 
org.apache.flink.client.ClientUtils.submitJob(ClusterClient, JobGraph)方法在 flink 
1.12版本下找不到了,
现在用哪个方法取代了呢?又该如何获取提交任务后的jobId呢?谢谢!


| publicstaticJobExecutionResultsubmitJob( |
| | ClusterClientclient, |
| | JobGraphjobGraph) throwsProgramInvocationException { |
| | checkNotNull(client); |
| | checkNotNull(jobGraph); |
| | try { |
| | return client |
| | .submitJob(jobGraph) |
| | .thenApply(DetachedJobExecutionResult::new) |
| | .get(); |
| | } catch (InterruptedException|ExecutionException e) { |
| | ExceptionUtils.checkInterrupted(e); |
| | thrownewProgramInvocationException("Could not run job in detached mode.", 
jobGraph.getJobID(), e); |
| | } |
| | } |


https://github.com/apache/flink/blob/release-1.11/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java#L77
https://github.com/apache/flink/blob/release-1.12/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java

Re: flink1.9.1单任务配置rocksDB不生效

2020-12-15 文章 Congxian Qiu
Hi
   state.backend 应该是你在 flink-conf 中设置了这个值。具体到你这里的情况,最终的配置是
RocksDB(以代码为准,如果代码没有设置会使用 flink-conf 中的文件)。你可以看看 TM 日志,应该可以看到更详细的信息
Best,
Congxian


bradyMk  于2020年12月15日周二 下午5:05写道:

> Hi,想请教大家一个问题,我用单任务配置使用rocksDB状态后端,代码如下:
>
> val backend = new RocksDBStateBackend(path, true)
> backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
> env.setStateBackend(backend.asInstanceOf[StateBackend])
>
> 但是运行代码后,去webui查看Job Manager --> Configuration
> 中查看,发现state.backend还是显示filesystem
>
> 这是说明我的配置没有生效嘛?如果没有生效,那么如何进行单任务配置rocksDB呢?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-15 文章 magichuang
我现在看看那个报错,flink是把requirements.txt  和  cached_dir  已经先上传到hdfs上了,因为  
/yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt
   

 
/yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir
  在提交的时候  去看机器上是存在的,只不过等程序挂了,这个 
/yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。

现在想请教一下,flink在引入外部  python依赖时,在从离线包里面安装库的时候是安装到了哪里?




我看报错信息:  Error [Errno 13] Permission denied: '' while executing command python 
setup.py egg_info

因为它是在  python setup.py  的时候报的权限问题




求大家给看看~~感谢  




-- 原始邮件 --

发 件 人:magichuang 

发送时间:2020-12-15 14:15:04

收 件 人:user-zh 

抄 送:

主 题:pyflink 引用第三库的文件出现安装权限的问题




请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错

flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器




提交命令:flink run -m yarn-cluster -ynm demo  -ys 2 -ytm 2048 -p 2 -py demo.py




代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png




报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png




requestments.txt:IPy==1.0cache_dir:  IPy-1.00.tar.gz




自定义udf代码:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def judge_ip(ip):

import IPy

if ip in IPy.IP('192.168.112.0/28'):

return 'in'

return 'out'







祝好~




flink1.9.1单任务配置rocksDB不生效

2020-12-15 文章 bradyMk
Hi,想请教大家一个问题,我用单任务配置使用rocksDB状态后端,代码如下:

val backend = new RocksDBStateBackend(path, true)
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
env.setStateBackend(backend.asInstanceOf[StateBackend])

但是运行代码后,去webui查看Job Manager --> Configuration
中查看,发现state.backend还是显示filesystem

这是说明我的配置没有生效嘛?如果没有生效,那么如何进行单任务配置rocksDB呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


zeppelin+flink1.12问题

2020-12-15 文章 赵一旦
如题,zeppelin+flink1.12报错(select 1)。
org.apache.zeppelin.interpreter.InterpreterException:
org.apache.zeppelin.interpreter.InterpreterException:
java.lang.NoSuchMethodError:
org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:760)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.zeppelin.interpreter.InterpreterException:
java.lang.NoSuchMethodError:
org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
at
org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:355)
at
org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:366)
at
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:47)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
... 8 more
Caused by: java.lang.NoSuchMethodError:
org.apache.flink.api.common.ExecutionConfig.disableSysoutLogging()Lorg/apache/flink/api/common/ExecutionConfig;
at
org.apache.zeppelin.flink.FlinkScalaInterpreter.setTableEnvConfig(FlinkScalaInterpreter.scala:444)
at
org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:114)
at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:67)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
... 12 more