Re: flink1.11 tablefunction

2020-07-21 文章 Benchao Li
Hi,
如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

Jark Wu  于2020年7月22日周三 上午11:17写道:

> Hi,
>
> Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
> https://issues.apache.org/jira/browse/FLINK-17855
>
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 10:45, Dream-底限  wrote:
>
> > hi,
> >  我想将一个array打散成多行,但是并没有成功
> >
> > @FunctionHint(input =@DataTypeHint("ARRAY > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
> > = @DataTypeHint("ROW > STRING,`result` INT,in_path BOOLEAN>"))
> > public static class FlatRowFunction extends TableFunction {
> > private static final long serialVersionUID = 1L;
> >
> > public void eval(Row[] rows) {
> > for (Row row : rows) {
> > collect(row);
> > }
> > }
> > }
> >
> > 异常如下:
> >
> > org.apache.flink.table.api.ValidationException: SQL validation failed.
> > From line 1, column 149 to line 1, column 174: No match found for
> > function signature
> > flatRow( > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > INTEGER result, BOOLEAN in_path) ARRAY>)
> >
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> > at
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> > at
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> > at
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
> > at
> >
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> > at
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > at
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> > at
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > at
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> > at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> > at
> > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> > at
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> > at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> > at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> > at
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> > at
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> > at
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> > Caused by: org.apache.calcite.runtime.CalciteContextException: From
> > line 1, column 149 to line 1, column 174: No match found for function
> > signature flatRow( > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
> > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > Method)
> > at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > at
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> > at
> > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> > at
> > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> > at
> 

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 刘首维
Hi JingSong,

  简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL 
SDK
  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子


  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的


如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的



发件人: Jingsong Li 
发送时间: 2020年7月22日 13:26:00
收件人: user-zh
抄送: imj...@gmail.com
主题: Re: 关于1.11Flink SQL 全新API设计的一些问题

可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?

Best
Jingsong

On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:

> Hi all,
>
>
>
> 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>
> 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>
>
>
> 所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>


--
Best, Jingsong Lee


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 Jingsong Li
可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?

Best
Jingsong

On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:

> Hi all,
>
>
>
> 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>
> 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>
>
>
> 所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>


-- 
Best, Jingsong Lee


Re: 想知道state写到checkpoint文件没有

2020-07-21 文章 Congxian Qiu
Hi
Checkpoint 包括两部分:1)meta 文件;2)具体的数据。如果是 Meta 部分可以参考
CheckpointMetadataLoadingTest[1] 自己写一个测试,如果你知道具体的内容,或许也可以看一下
StatePorcessAPI[2]

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html

Best,
Congxian


sun <1392427...@qq.com> 于2020年7月21日周二 下午12:02写道:

>  请问怎么反编译checkpoint文件啊,我想知道state写到checkpoint文件没有
>
>
>
>
>   _default_
> OPERATOR_STATE_DISTRIBUTION_MODE  SPLIT_DISTRIBUTE 
>  VALUE_SERIALIZER 
> Gorg.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfigzS酿
>   脂? sr
> -org.apache.flink.runtime.state.JavaSerializerFSX韦4
> ? xr
> Borg.apache.flink.api.common.typeutils.base.TypeSerializerSingletony﹪.wE
>  xr 4org.apache.flink.api.common.typeutils.TypeSerializer
>  xp 
> -org.apache.flink.runtime.state.JavaSerializer
> topic-partition-offset-states
> OPERATOR_STATE_DISTRIBUTION_MODE  UNION 
>  VALUE_SERIALIZER 
> Iorg.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshotzS酿
>矛? sr
> ;org.apache.flink.api.java.typeutils.runtime.TupleSerializer 
> xr
> ?org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
>   I  arityI  length[  fieldSerializerst
> 7[Lorg/apache/flink/api/common/typeutils/TypeSerializer;L
> tupleClasst  Ljava/lang/Class;xr
> 4org.apache.flink.api.common.typeutils.TypeSerializer  
>xp  r
> 7[Lorg.apache.flink.api.common.typeutils.TypeSerializer;9?Ч 麡 
> xp  sr
> ?org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>   L  defaultSerializerClassest
> Ljava/util/LinkedHashMap;L  defaultSerializersq ~  L  kryoRegistrationsq ~
>L  registeredTypest  Ljava/util/LinkedHashSet;L
> $registeredTypesWithSerializerClassesq ~L
> registeredTypesWithSerializersq ~L  typeq ~  xq ~  sr
> java.util.LinkedHashMap4繬\ l利   Z  accessOrderxr  java.util.HashMap  诹?`?
> F
> loadFactorI thresholdxp?@   w   
>  x sq ~ ?@   w x sq ~
> ?@   w t
> Iorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionsr
>  registeredClassq ~  L  serializableSerializerInstancet
> DLorg/apache/flink/api/common/ExecutionConfig$SerializableSerializer;L
> serializerClassq ~  L  serializerDefinitionTypet
> WLorg/apache/flink/api/java/typeutils/runtime/KryoRegistration$SerializerDefinitionType;xpvr
> Iorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
>  [/剀X5   I
> cachedHashI partitionL  topict  Ljava/lang/String;xppp~r
> Uorg.apache.flink.api.java.typeutils.runtime.KryoRegistration$SerializerDefinitionType
>  xr  java.lang.Enum   
>  xpt  UNSPECIFIEDt )org.apache.avro.generic.GenericData$Arraysq ~  vr
> Uorg.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass
> xppvr
> Yorg.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass
> xp~q ~  t  CLASSx pppq ~  sr
> 9org.apache.flink.api.common.typeutils.base.LongSerializer 
> xr
> Borg.apache.flink.api.common.typeutils.base.TypeSerializerSingletony﹪.wE
>  xq ~  vr org.apache.flink.api.java.tuple.Tuple2 
>  L  f0t  Ljava/lang/Object;L  f1q ~ )xr
> %org.apache.flink.api.java.tuple.Tuple 
> xp;  ?  ?  )
>3  sr
> ?org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>   L  defaultSerializerClassest
> Ljava/util/LinkedHashMap;L  defaultSerializersq ~  L  kryoRegistrationsq ~
> L  registeredTypest  Ljava/util/LinkedHashSet;L
> $registeredTypesWithSerializerClassesq ~  L
> registeredTypesWithSerializersq ~  L  typet  Ljava/lang/Class;xr
> 4org.apache.flink.api.common.typeutils.TypeSerializer  
>xpsr  java.util.LinkedHashMap4繬\ l利   Z  accessOrderxr
> java.util.HashMap  诹?`?  F
> loadFactorI thresholdxp?@   w   
>  x sq ~  ?@   w x sq ~
> ?@   w t
> Iorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionsr
>  registeredClassq ~  L  serializableSerializerInstancet
> DLorg/apache/flink/api/common/ExecutionConfig$SerializableSerializer;L
> serializerClassq ~  L  serializerDefinitionTypet
> WLorg/apache/flink/api/java/typeutils/runtime/KryoRegistration$SerializerDefinitionType;xpvr
> Iorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
>  [/剀X5   I
> cachedHashI partitionL  topict  Ljava/lang/String;xppp~r
> Uorg.apache.flink.api.java.typeutils.runtime.KryoRegistration$SerializerDefinitionType
>  xr  java.lang.Enum   
>  xpt  UNSPECIFIEDt )org.apache.avro.generic.GenericData$Arraysq ~ vr
> Uorg.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass
> xppvr
> Yorg.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass
> xp~q ~  t  CLASSx pppq ~   
> \org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshotzS酿
>3  sr
> 

Re: Re: Flink catalog的几个疑问

2020-07-21 文章 dixingxin...@163.com
@Godfrey @Jingsong 感谢回复,很好的解答了我的疑惑!
背景是这样的,目前我们正打算实现一套支持持久化的catalog,同时基于这个catalog实现一个metaserver,对外暴露REST接口,用来支持日常管理操作,比如:
1.基于原生DDL管理source,sink,支持多种connector,并将这些元数据持久化到mysql中。
2.做统一的权限控制

我们面临两种选择:
1.基于hive catalog建设自己的catalog(或者说直接使用hive catalog):
优势:鉴于hive catalog已经相对比较完善,直接使用可以减少开发量。
劣势:不太明确社区对hive catalog的定位;大小写不敏感带来的麻烦。(大致是之前提到的3个问题)

2.完全自建catalog:
优势:灵活可控;依然可以利用已有的catalog
劣势:设计开发成本高,引入大量代码可能需要持续维护(比如后续catalog 
api发生变动);同时如果社区后续提供官方的catalog默认实现,我们会再次面临是否切换的问题。

目前我们是倾向于自建catalog的。

@Jark 默认的catalog应该算是个通用的需求,感觉在批流一体的大势下,是挺重要的一步(目前hive 
catalog可能还不够)。另外很多公司都在基于开源Flink做计算平台,如果Flink有默认catalog并提供metaserver,那么无疑是十分友好的。
我们优先实现内部版本,实现既定目标。有机会的话,我们也希望能回馈社区。

@All 目前我们想的还不够多,考虑可能不全面,还希望大家给些建议。



Best,
Xingxing Di
 
Sender: Jark Wu
Send Time: 2020-07-22 11:22
Receiver: user-zh
Subject: Re: Flink catalog的几个疑问
非常欢迎贡献开源一个轻量的 catalog 实现 :)
 
On Wed, 22 Jul 2020 at 10:53, Jingsong Li  wrote:
 
> Hi,
>
> HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。
>
> > 后续有可能转正为flink 默认的catalog实现吗?
>
> 目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。
>
> > hive catalog是不支持大小写敏感的
>
> 是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。
>
> Best,
> Jingsong
>
> On Wed, Jul 22, 2020 at 10:39 AM godfrey he  wrote:
>
> > hi Xingxing,
> >
> > 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
> > postgres catalog,
> > 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
> > 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
> > catalog写新的meta。
> > 是否会转为默认catalog,据我所知,目前没有。
> > 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
> >
> > Best,
> > Godfrey
> >
> > dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:
> >
> > > Hi Flink社区:
> > > 有几个疑问希望社区小伙伴们帮忙解答一下:
> > >
> > >
> >
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> > > 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> > > 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
> > >
> > >
> > >
> > >
> > > Best,
> > > Xingxing Di
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink1.11 web ui没有DAG

2020-07-21 文章 Congxian Qiu
Hi
这边说的 UI 上不显示数据接受和发送的条数,能否截图发一下,这样大家能更好的理解这个问题。另外 flink 作业有数据输入和处理吗?

Best,
Congxian


小学生 <201782...@qq.com> 于2020年7月22日周三 上午10:47写道:

> 本地linux下单机版安装的,提交flink代码运行后,正常运行,有日志,但是为啥UI上面却不显示数据接收和发送的条数,求大佬解答


Re: flink1.11任务启动

2020-07-21 文章 Congxian Qiu
Hi
你可以把的启动命令贴一下,然后说一下你期望的行为是什么,现在看到的行为是什么。
Best,
Congxian


酷酷的浑蛋  于2020年7月22日周三 下午12:43写道:

> 现在启动任务(yarn)怎么指定-ys   和-p  都不管用了?  自动就分配好多core?
> 默认启动个任务给我启动了100个core?100个container?我擦,啥情况啊,现在指定什么参数才生效啊?我默认配置也没有配置过多少个core啊,默认不是1个吗


回复:flink1.11启动问题

2020-07-21 文章 JasonLee
Hi
报错显示的是资源不足了 你确定yarn上的资源是够的吗 看下是不是节点挂了 1.11我这边提交任务都是正常的


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月21日 16:36,酷酷的浑蛋 写道:


服了啊,这个flink1.11启动怎么净是问题啊


我1.7,1.8,1.9 都没有问题,到11就不行
./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm 1024 
-ynm sql_test ./examples/batch/WordCount.jar --input 
hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a


报错:
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. Please make 
sure that the cluster has enough resources. at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
 ... 45 more Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.TimeoutException at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) 
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 ... 25 more


我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),



Re: flink1.11启动问题

2020-07-21 文章 Yang Wang
可以的话,发一下client端和JM端的log

1.11是对提交方式有一些变化,但应该都是和之前兼容的,你的提交命令看着也是没有问题的
我自己试了一下也是可以正常运行的


Best,
Yang

酷酷的浑蛋  于2020年7月22日周三 上午11:06写道:

> jm里面没有日志啊,关键是配置都是一样的,我在1.9里运行就没问题,在flink1.11就一直卡在那里,不分配资源,到底启动方式改变了啥呢?
> 集群资源是有的,可是任务一直卡在那说没资源,这怎么办
>
>
>
>
> 在2020年07月21日 17:22,Shuiqiang Chen 写道:
> Hi,
>
> 可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源
>
> Best,
> Shuiqiang
>
> 酷酷的浑蛋  于2020年7月21日周二 下午4:37写道:
>
>
>
> 服了啊,这个flink1.11启动怎么净是问题啊
>
>
> 我1.7,1.8,1.9 都没有问题,到11就不行
> ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm
> 1024 -ynm sql_test ./examples/batch/WordCount.jar --input
> hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a
>
>
> 报错:
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources. at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ... 45 more Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException at
>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
>
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> ... 25 more
>
>
>
>
> 我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),
>
>
>


回复:flink1.11启动问题

2020-07-21 文章 JasonLee
HI
你使用的什么模式?启动任务的命令发出来看一下吧


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月22日 12:44,酷酷的浑蛋 写道:
现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?  





flink1.11启动问题

2020-07-21 文章 酷酷的浑蛋
现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?  





flink1.11任务启动

2020-07-21 文章 酷酷的浑蛋
现在启动任务(yarn)怎么指定-ys   和-p  都不管用了?  自动就分配好多core? 
默认启动个任务给我启动了100个core?100个container?我擦,啥情况啊,现在指定什么参数才生效啊?我默认配置也没有配置过多少个core啊,默认不是1个吗

关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 刘首维
Hi all,



很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~

我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL 
SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。



所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)


答复: Flink catalog的几个疑问

2020-07-21 文章 刘首维
hi all, 我在想如果社区提供一个unified metastore 
server是不是会解决这个问题,然后写一个(一系列)catalog和这个metastore对应


发件人: Jark Wu 
发送时间: 2020年7月22日 11:22:56
收件人: user-zh
主题: Re: Flink catalog的几个疑问

非常欢迎贡献开源一个轻量的 catalog 实现 :)

On Wed, 22 Jul 2020 at 10:53, Jingsong Li  wrote:

> Hi,
>
> HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。
>
> > 后续有可能转正为flink 默认的catalog实现吗?
>
> 目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。
>
> > hive catalog是不支持大小写敏感的
>
> 是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。
>
> Best,
> Jingsong
>
> On Wed, Jul 22, 2020 at 10:39 AM godfrey he  wrote:
>
> > hi Xingxing,
> >
> > 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
> > postgres catalog,
> > 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
> > 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
> > catalog写新的meta。
> > 是否会转为默认catalog,据我所知,目前没有。
> > 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
> >
> > Best,
> > Godfrey
> >
> > dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:
> >
> > > Hi Flink社区:
> > > 有几个疑问希望社区小伙伴们帮忙解答一下:
> > >
> > >
> >
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> > > 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> > > 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
> > >
> > >
> > >
> > >
> > > Best,
> > > Xingxing Di
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink1.11 sql

2020-07-21 文章 Leonard Xu
Hi

必须可以呢,参考[1]

Best,
Leonard Xu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_functions.html
 



> 在 2020年7月22日,12:14,Dream-底限  写道:
> 
> hi
> flink支持配置hive方言,那么flink可以直接使用hive内自定义的udf、udtf函数吗



flink1.11 sql

2020-07-21 文章 Dream-底限
hi
flink支持配置hive方言,那么flink可以直接使用hive内自定义的udf、udtf函数吗


flink1.11 实现tablefunction报错

2020-07-21 文章 Dream-底限
hi、
我这面实现了一个tablefunction想打撒数据,但是现在我运行官方demo样式的demo都无法成功,请问下面是什么原因:

@FunctionHint(output = @DataTypeHint("ROW"))
public static class FlatRowFunction extends TableFunction {
private static final long serialVersionUID = 1L;

public void eval(String rows) {
for (String row : rows.split("-->")) {
collect(Row.of(row));
}
}
}

sql使用:from parser_data_test, LATERAL TABLE(flatRow(data.path))

异常:

Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 1, column 154 to line 1, column 171: No match found for
function signature flatRow()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
at com.akulaku.data.flink.ParserDataTest.main(ParserDataTest.java:60)
Caused by: org.apache.calcite.runtime.CalciteContextException: From
line 1, column 154 to line 1, column 171: No match found for function
signature flatRow()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 5 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
match found for function signature flatRow()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 

Re: Flink catalog的几个疑问

2020-07-21 文章 Jark Wu
非常欢迎贡献开源一个轻量的 catalog 实现 :)

On Wed, 22 Jul 2020 at 10:53, Jingsong Li  wrote:

> Hi,
>
> HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。
>
> > 后续有可能转正为flink 默认的catalog实现吗?
>
> 目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。
>
> > hive catalog是不支持大小写敏感的
>
> 是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。
>
> Best,
> Jingsong
>
> On Wed, Jul 22, 2020 at 10:39 AM godfrey he  wrote:
>
> > hi Xingxing,
> >
> > 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
> > postgres catalog,
> > 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
> > 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
> > catalog写新的meta。
> > 是否会转为默认catalog,据我所知,目前没有。
> > 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
> >
> > Best,
> > Godfrey
> >
> > dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:
> >
> > > Hi Flink社区:
> > > 有几个疑问希望社区小伙伴们帮忙解答一下:
> > >
> > >
> >
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> > > 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> > > 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
> > >
> > >
> > >
> > >
> > > Best,
> > > Xingxing Di
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink1.11 tablefunction

2020-07-21 文章 Jark Wu
Hi,

Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
https://issues.apache.org/jira/browse/FLINK-17855


Best,
Jark

On Wed, 22 Jul 2020 at 10:45, Dream-底限  wrote:

> hi,
>  我想将一个array打散成多行,但是并没有成功
>
> @FunctionHint(input =@DataTypeHint("ARRAY STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
> = @DataTypeHint("ROW STRING,`result` INT,in_path BOOLEAN>"))
> public static class FlatRowFunction extends TableFunction {
> private static final long serialVersionUID = 1L;
>
> public void eval(Row[] rows) {
> for (Row row : rows) {
> collect(row);
> }
> }
> }
>
> 异常如下:
>
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 1, column 149 to line 1, column 174: No match found for
> function signature
> flatRow( VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> INTEGER result, BOOLEAN in_path) ARRAY>)
>
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
> at
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> line 1, column 149 to line 1, column 174: No match found for function
> signature flatRow( rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
> rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> at
> 

Re: Flink catalog的几个疑问

2020-07-21 文章 godfrey he
hi Xingxing,

1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
postgres catalog,
可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
catalog写新的meta。
是否会转为默认catalog,据我所知,目前没有。
3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。

Best,
Godfrey

dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:

> Hi Flink社区:
> 有几个疑问希望社区小伙伴们帮忙解答一下:
>
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
>
>
>
>
> Best,
> Xingxing Di
>


回复: flink1.11启动问题

2020-07-21 文章 酷酷的浑蛋
jm里面没有日志啊,关键是配置都是一样的,我在1.9里运行就没问题,在flink1.11就一直卡在那里,不分配资源,到底启动方式改变了啥呢? 
集群资源是有的,可是任务一直卡在那说没资源,这怎么办




在2020年07月21日 17:22,Shuiqiang Chen 写道:
Hi,

可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源

Best,
Shuiqiang

酷酷的浑蛋  于2020年7月21日周二 下午4:37写道:



服了啊,这个flink1.11启动怎么净是问题啊


我1.7,1.8,1.9 都没有问题,到11就不行
./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm
1024 -ynm sql_test ./examples/batch/WordCount.jar --input
hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a


报错:
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure that the cluster has enough resources. at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
... 45 more Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.TimeoutException at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
... 25 more



我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),




Re: flinksql1.11中主键声明的问题

2020-07-21 文章 Leonard Xu
Hi,

你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。
在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。
我理解你把connector的with参数更新成新的就解决问题了。

Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
 

> 
> def register_rides_source(st_env):
>source_ddl = \
>"""
>create table source1(
> id int,
> time1 varchar ,
> type string
> ) with (
>'connector.type' = 'kafka',
>'connector.topic' = 'tp1',
>'connector.startup-mode' = 'latest-offset',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'format.type' = 'json',
>'connector.version' = 'universal',
>'update-mode' = 'append'
> )
>“"" 


Re: Flink catalog的几个疑问

2020-07-21 文章 Jingsong Li
Hi,

HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。

> 后续有可能转正为flink 默认的catalog实现吗?

目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。

> hive catalog是不支持大小写敏感的

是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。

Best,
Jingsong

On Wed, Jul 22, 2020 at 10:39 AM godfrey he  wrote:

> hi Xingxing,
>
> 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
> postgres catalog,
> 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
> 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
> catalog写新的meta。
> 是否会转为默认catalog,据我所知,目前没有。
> 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
>
> Best,
> Godfrey
>
> dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:
>
> > Hi Flink社区:
> > 有几个疑问希望社区小伙伴们帮忙解答一下:
> >
> >
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> > 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> > 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
> >
> >
> >
> >
> > Best,
> > Xingxing Di
> >
>


-- 
Best, Jingsong Lee


flink1.11 web ui????DAG

2020-07-21 文章 ??????
linuxflink??UI

Re: flink1.11 tablefunction

2020-07-21 文章 Dream-底限
hi,
 我想将一个array打散成多行,但是并没有成功

@FunctionHint(input =@DataTypeHint("ARRAY>") ,output
= @DataTypeHint("ROW"))
public static class FlatRowFunction extends TableFunction {
private static final long serialVersionUID = 1L;

public void eval(Row[] rows) {
for (Row row : rows) {
collect(row);
}
}
}

异常如下:

org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 1, column 149 to line 1, column 174: No match found for
function signature
flatRow()

at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
at 
com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: org.apache.calcite.runtime.CalciteContextException: From
line 1, column 149 to line 1, column 174: No match found for function
signature flatRow()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 

flinksql1.11中主键声明的问题

2020-07-21 文章 1129656...@qq.com
hi: 
我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 
但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题
下面我附录了报错信息和代码。谢谢!

报错附录
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", 
line 147, in deco
return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in 
get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.
: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires 
that Table has a full primary keys if it is updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "mysql_join.py", line 90, in 
from_kafka_to_kafka_demo()
  File "mysql_join.py", line 22, in from_kafka_to_kafka_demo
st_env.execute("2-from_kafka_to_kafka")
  File 
"/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", 
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", 
line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated.'

代码附录

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble 


def from_kafka_to_kafka_demo():

# use blink table planner
env = StreamExecutionEnvironment.get_execution_environment()

flink1.11 web ui????DAG

2020-07-21 文章 ??????
linuxflink??UI

Re: flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 Jingsong Li
你的Source表是怎么定义的?确定有watermark前进吗?(可以看Flink UI)

'sink.partition-commit.trigger'='partition-time' 去掉试试?

Best,
Jingsong

On Wed, Jul 22, 2020 at 12:02 AM Leonard Xu  wrote:

> HI,
>
> Hive 表时在flink里建的吗? 如果是建表时使用了hive dialect吗?可以参考[1]设置下
>
> Best
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
> >
>
> > 在 2020年7月21日,22:57,kcz <573693...@qq.com> 写道:
> >
> > 一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: JasonLee <17610775...@163.com 
> > 发送时间: 2020年7月21日 20:39
> > 收件人: user-zh mailto:user-zh@flink.apache.org
> >
> > 主题: 回复:flink-1.11 ddl kafka-to-hive问题
> >
> >
> >
> > hi
> > hive表是一直没有数据还是过一段时间就有数据了?
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月21日 19:09,kcz 写道:
> > hive-1.2.1
> > chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗?
> > String hiveSql = "CREATE TABLE stream_tmp.fs_table (\n" +
> >  " host STRING,\n" +
> >  " url STRING," +
> >  " public_date STRING" +
> >  ") partitioned by (public_date
> string) " +
> >  "stored as PARQUET " +
> >  "TBLPROPERTIES (\n" +
> >  "
> 'sink.partition-commit.delay'='0 s',\n" +
> >  "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> >  "
> 'sink.partition-commit.policy.kind'='metastore,success-file'" +
> >  ")";
> > tableEnv.executeSql(hiveSql);
> >
> >
> > tableEnv.executeSql("INSERT INTO stream_tmp.fs_table SELECT host,
> url, DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");
>
>

-- 
Best, Jingsong Lee


Re: flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 Leonard Xu
HI,

Hive 表时在flink里建的吗? 如果是建表时使用了hive dialect吗?可以参考[1]设置下

Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
 


> 在 2020年7月21日,22:57,kcz <573693...@qq.com> 写道:
> 
> 一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的
> 
> 
> 
> 
> 
> -- 原始邮件 --
> 发件人: JasonLee <17610775...@163.com 
> 发送时间: 2020年7月21日 20:39
> 收件人: user-zh mailto:user-zh@flink.apache.org>
> 主题: 回复:flink-1.11 ddl kafka-to-hive问题
> 
> 
> 
> hi
> hive表是一直没有数据还是过一段时间就有数据了?
> 
> 
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
> 
> Signature is customized by Netease Mail Master
> 
> 在2020年07月21日 19:09,kcz 写道:
> hive-1.2.1
> chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗?
> String hiveSql = "CREATE TABLE stream_tmp.fs_table (\n" +
>  " host STRING,\n" +
>  " url STRING," +
>  " public_date STRING" +
>  ") partitioned by (public_date string) " 
> +
>  "stored as PARQUET " +
>  "TBLPROPERTIES (\n" +
>  " 'sink.partition-commit.delay'='0 
> s',\n" +
>  " 
> 'sink.partition-commit.trigger'='partition-time',\n" +
>  " 
> 'sink.partition-commit.policy.kind'='metastore,success-file'" +
>  ")";
> tableEnv.executeSql(hiveSql);
> 
> 
> tableEnv.executeSql("INSERT INTO stream_tmp.fs_table SELECT host, url, 
> DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");



Flink catalog的几个疑问

2020-07-21 文章 dixingxin...@163.com
Hi Flink社区:
有几个疑问希望社区小伙伴们帮忙解答一下:
1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗? 
3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。




Best,
Xingxing Di


回复:flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 kcz
一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的





-- 原始邮件 --
发件人: JasonLee <17610775...@163.com
发送时间: 2020年7月21日 20:39
收件人: user-zh 

Re: FlinkKafkaConsumer API 维表关联

2020-07-21 文章 Jark Wu
你需要用 DDL 去声明这张 kafka 表[1], 目前不建议使用 `Kafka` 和 `StreamTableDescriptor` API。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

On Thu, 16 Jul 2020 at 11:43, 郑斌斌  wrote:

> 各位好:
>
> 请教一下,用FlinkKafkaConsumer API的话,如何支持SQL的方式,和维表关联。(之前用Kafka
> API API是可以的)
>  "select  a.id,b.name from kafka_table a "
> + "join dim_table FOR SYSTEM_TIME AS OF a.proctime as b on a.id =
> b.user_id";
>
> thanks & Regards


Re: flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 Jark Wu
rolling 策略配一下?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-rolling-policy-rollover-interval

Best,
Jark

On Tue, 21 Jul 2020 at 20:38, JasonLee <17610775...@163.com> wrote:

> hi
> hive表是一直没有数据还是过一段时间就有数据了?
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月21日 19:09,kcz 写道:
> hive-1.2.1
> chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗?
> String hiveSql = "CREATE  TABLE  stream_tmp.fs_table (\n" +
>"  host STRING,\n" +
>"  url STRING," +
>"  public_date STRING" +
>") partitioned by (public_date string) " +
>"stored as PARQUET " +
>"TBLPROPERTIES (\n" +
>"  'sink.partition-commit.delay'='0 s',\n" +
>"  'sink.partition-commit.trigger'='partition-time',\n" +
>"  'sink.partition-commit.policy.kind'='metastore,success-file'" +
>")";
> tableEnv.executeSql(hiveSql);
>
>
> tableEnv.executeSql("INSERT INTO  stream_tmp.fs_table SELECT host, url,
> DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");


Re: Flink sql中可以使用自定义窗口触发器吗

2020-07-21 文章 Jark Wu
Hi,

目前是不支持的。不过有个实验性功能可以指定提前输出的策略和迟到处理的策略 [1],可能可以满足你的需求。

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L175

On Tue, 21 Jul 2020 at 22:28, 462329521 <462329...@qq.com> wrote:

> Hi,想问下现在的Flinksql支持使用自定义窗口触发器吗?


Re: flink解析kafka json数据

2020-07-21 文章 Jark Wu
目前是不支持的。这个需求有点太业务特定了。flink 不可能为了一个错误日志去抽象、对接各种存储系统。
一种方案是社区可以考虑支持下打印到日志里,然后用户可以通过自定义插件 log appender 写入外部存储。

Best,
Jark

On Tue, 21 Jul 2020 at 18:53, Dream-底限  wrote:

> hi
>  json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃
>
> Leonard Xu  于2020年7月21日周二 下午4:18写道:
>
> > Hi,
> > 我理解应该做不到,因为这两个format参数在format里就做的。
> > json.ignore-parse-errors 是在
> > format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> > 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> > 这两个不能同时为ture,语义上就是互斥的。
> >
> > Best
> > Leonard Xu
> > > 在 2020年7月21日,16:08,Dream-底限  写道:
> > >
> > >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
> >
> >
>


Flink sql中可以使用自定义窗口触发器吗

2020-07-21 文章 462329521
Hi,想问下现在的Flinksql支持使用自定义窗口触发器吗?

Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-21 文章 Jark Wu
Hi,

目前 Flink SQL CDC 是不支持自动感知新表的,得要提前定义要表的 schema 然后提交同步作业。比如你上面的例子,就需要定义两个
source 表:

CREATE TABLE `test` (
  `id` int,
  `name` string,
  `time` timestamp(3),
  `status` int
) with (
  'connector' = 'kafka',
  'format' = 'canal-json',
  ...
);

insert into downstream1 select * from `test`;


CREATE TABLE `status` (
  `id` int
  `name` string
) with (
  'connector' = 'kafka',
  'format' = 'canal-json',
  ...
);

insert into downstream2 select * from `status`;


Best,
Jark





On Tue, 21 Jul 2020 at 15:19, godfrey he  wrote:

>
> http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
>  这个邮件里提到了类似的问题。
>
> https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
> “data”,“mysqlType”等格式不确定的字段定义为String类型,
> 下游通过udf自己再解析对应的json
>
>
> Best,
> Godfrey
>
> jindy_liu <286729...@qq.com> 于2020年7月21日周二 下午12:37写道:
>
> > 例如:
> >
> > mysql表:
> > CREATE TABLE `test` (
> >   `id` int(11) NOT NULL,
> >   `name` varchar(255) NOT NULL,
> >   `time` datetime NOT NULL,
> >   `status` int(11) NOT NULL,
> >   PRIMARY KEY (`id`)
> > ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> >
> > CREATE TABLE `status` (
> >   `id` int(11) NOT NULL,
> >   `name` varchar(255) NOT NULL,
> >   PRIMARY KEY (`id`)
> > ) ENGINE=InnoDB DEFAULT CHARSET=utf8
> >
> > kafka中数据:
> > // 表test 中insert事件
> > {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
> >
> >
> 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
> >
> > //表status 中的事件
> >
> >
> {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
> >
> > 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
> > 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: SQL 报错只有 flink runtime 的 NPE

2020-07-21 文章 Jark Wu
这个异常一般是由于 UDF 的实现用了主类型(int),但是实际的字段值有 null 值。
你可以试试先做个 where 条件过滤,将 null 值过滤掉?

Best,
Jark


On Mon, 20 Jul 2020 at 15:28, godfrey he  wrote:

> 看不到图片信息,换一个图床工具上传图片吧
>
> Luan Cooper  于2020年7月17日周五 下午4:11写道:
>
> > 附一个 Job Graph 信息,在 Cal 处挂了
> > [image: image.png]
> >
> > On Fri, Jul 17, 2020 at 4:01 PM Luan Cooper  wrote:
> >
> >> 实际有 20 左右个字段,用到的 UDF 有 COALESCE / CAST / JSON_PATH / TIMESTAMP 类
> >> *是指 UDF 返回了 NULL 导致的吗?*
> >>
> >>
> >> On Fri, Jul 17, 2020 at 2:54 PM godfrey he  wrote:
> >>
> >>> udf_xxx的逻辑是啥?
> >>>
> >>>
> >>> Luan Cooper  于2020年7月17日周五 下午2:40写道:
> >>>
> >>> > Hi
> >>> >
> >>> > 我有这么一个 SQL
> >>> > INSERT INTO es
> >>> > SELECT
> >>> > a,
> >>> > udf_xxx(b)
> >>> > FROM mongo_oplog -- 自定义 TableFactory
> >>> >
> >>> > Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码
> >>> Exception,可以稳定重现
> >>> >
> >>> > LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
> >>> > (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
> >>> >
> >>> > java.lang.NullPointerException
> >>> >
> >>> > at StreamExecCalc$8016.split$7938$(Unknown Source)
> >>> >
> >>> > at StreamExecCalc$8016.processElement(Unknown Source)
> >>> >
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> >>> >
> >>> > at
> >>> > org.apache.flink.streaming.runtime.io
> >>> >
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> >>> >
> >>> > at
> >>> > org.apache.flink.streaming.runtime.io
> >>> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> >>> >
> >>> > at
> >>> > org.apache.flink.streaming.runtime.io
> >>> >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> >>> >
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> >>> >
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> >>> >
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> >>> >
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> >>> >
> >>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>> >
> >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>> >
> >>> > at java.lang.Thread.run(Thread.java:748)
> >>> >
> >>> > 请问这种怎样情况排查问题?
> >>> > 有任何线索都可以
> >>> >
> >>> > 感谢
> >>> >
> >>>
> >>
>


Re:Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 zhanglianzhg
看下CliClient.java源码 open接口,
final Optional cmdCall = parseCommand(line);
cmdCall.ifPresent(this::callCommand);
可以看出解析字符串后执行响应命令。
目前我们这边一个项目也在做相似的,可以界面写好slq,以分号作为分隔符表示ddl或则DMl作为分隔符。
然后以文件方式保存(可以作为日志等用作)。
然后自己实现一个excutor类包装了tableEnvironment,主要功能用作string命令解析以及命令执行,可以简单的把flink的解析以及
callCommand拿过来,然后加以改造,内部支持ddl、dml、函数注册等。

这样不管做什么table操作,创建表或者注册函数、执行操作命令一个接口搞定。
其主要改动是:扩展callCommand以及SqlCommandCall
















在 2020-07-21 20:35:01,"godfrey he"  写道:
>sql-client.sh的-u是指update语句,目前只支持insert。
>
>Jark Wu  于2020年7月21日周二 下午6:47写道:
>
>> Hi,
>>
>> 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
>> 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
>> 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.
>>
>> Best,
>> Jark
>>
>> On Thu, 16 Jul 2020 at 19:43, Harold.Miao  wrote:
>>
>> > hi flink users
>> >
>> > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
>> > 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
>> >
>> >
>> > --
>> >
>> > Best Regards,
>> > Harold Miao
>> >
>>


??????flink-1.11 ddl kafka-to-hive????

2020-07-21 文章 JasonLee
hi
hive??


| |
JasonLee
|
|
??17610775...@163.com
|

Signature is customized by Netease Mail Master

??2020??07??21?? 19:09??kcz ??
hive-1.2.1
chk 
??chkchk??kafkahive??
String hiveSql = "CREATE  TABLE  stream_tmp.fs_table (\n" +
   "  host STRING,\n" +
   "  url STRING," +
   "  public_date STRING" +
   ") partitioned by (public_date string) " +
   "stored as PARQUET " +
   "TBLPROPERTIES (\n" +
   "  'sink.partition-commit.delay'='0 s',\n" +
   "  'sink.partition-commit.trigger'='partition-time',\n" +
   "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
   ")";
tableEnv.executeSql(hiveSql);


tableEnv.executeSql("INSERT INTO  stream_tmp.fs_table SELECT host, url, 
DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");

Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 godfrey he
sql-client.sh的-u是指update语句,目前只支持insert。

Jark Wu  于2020年7月21日周二 下午6:47写道:

> Hi,
>
> 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
> 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
> 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.
>
> Best,
> Jark
>
> On Thu, 16 Jul 2020 at 19:43, Harold.Miao  wrote:
>
> > hi flink users
> >
> > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
> > 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 Harold.Miao
谢谢  我暂时这样改了一下

public boolean submitUpdate(String statement) {
   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
   terminal.writer().println(new AttributedString(statement).toString());
   terminal.flush();

   final Optional parsedStatement = parseCommand(statement);
   // only support INSERT INTO/OVERWRITE
   return parsedStatement.map(cmdCall -> {
  switch (cmdCall.command) {
 case INSERT_INTO:
 case INSERT_OVERWRITE:
return callInsert(cmdCall);




* case CREATE_TABLE:callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_TABLE_CREATED);return true;*
default:
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
return false;
  }
   }).orElse(false);
}


Jark Wu  于2020年7月21日周二 下午6:41写道:

> Hi,
>
> 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
> 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
> 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.
>
> Best,
> Jark
>
> On Thu, 16 Jul 2020 at 19:43, Harold.Miao  wrote:
>
> > hi flink users
> >
> > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
> > 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re: flink1.11 tablefunction

2020-07-21 文章 godfrey he
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide


Dream-底限  于2020年7月21日周二 下午7:25写道:

> hi
>
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
>


Re: flink table??????????????????????????????

2020-07-21 文章 ??????
1.??insert??checkg_sink_unit
2.kafkag_unit

Re: flink table同时作为写出及输入时下游无数据

2020-07-21 文章 咿咿呀呀
就是没有数据,我这个是简化版本的,都切换为kafka的初始源是没问题的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 tablefunction

2020-07-21 文章 Dream-底限
hi
我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)


Re: Flink 1.11 submit job timed out

2020-07-21 文章 Congxian Qiu
Hi
   不确定 k8s 环境中能否看到 pod 的完整日志?类似 Yarn 的 NM 日志一样,如果有的话,可以尝试看一下这个 pod
的完整日志有没有什么发现
Best,
Congxian


SmileSmile  于2020年7月21日周二 下午3:19写道:

> Hi,Congxian
>
> 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be
> resolved,jm失联,作业提交失败。
> 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。
>
> 在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。
>
>
> 是否有其他排查思路?
>
> Best!
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> On 07/16/2020 13:17, Congxian Qiu wrote:
> Hi
>   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在 Yarn
> 环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。
>
> Best,
> Congxian
>
>
> SmileSmile  于2020年7月15日周三 下午5:20写道:
>
> > Hi Roc
> >
> > 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > On 07/15/2020 17:16, Roc Marshal wrote:
> > Hi,SmileSmile.
> > 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> > 希望这对你有帮助。
> >
> >
> > 祝好。
> > Roc Marshal
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> > >
> > >Hi
> > >
> > >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> > 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM
> time
> > out,作业提交失败。web ui也会卡主无响应。
> > >
> > >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> > >
> > >
> > >部分日志如下:
> > >
> > >2020-07-15 16:58:46,460 WARN
> > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > hostname could be resolved for the IP address 10.32.160.7, using IP
> address
> > as host name. Local input split assignment (such as for HDFS files) may
> be
> > impacted.
> > >2020-07-15 16:58:46,460 WARN
> > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > hostname could be resolved for the IP address 10.44.224.7, using IP
> address
> > as host name. Local input split assignment (such as for HDFS files) may
> be
> > impacted.
> > >2020-07-15 16:58:46,461 WARN
> > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > hostname could be resolved for the IP address 10.40.32.9, using IP
> address
> > as host name. Local input split assignment (such as for HDFS files) may
> be
> > impacted.
> > >
> > >2020-07-15 16:59:10,236 INFO
> > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> The
> > heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed
> out.
> > >2020-07-15 16:59:10,236 INFO
> > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > Disconnect job manager 
> > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for job
> > e1554c737e37ed79688a15c746b6e9ef from the resource manager.
> > >
> > >
> > >how to deal with ?
> > >
> > >
> > >beset !
> > >
> > >| |
> > >a511955993
> > >|
> > >|
> > >邮箱:a511955...@163.com
> > >|
> > >
> > >签名由 网易邮箱大师 定制
> >
>


flink-1.11 ddl kafka-to-hive????

2020-07-21 文章 kcz
hive-1.2.1
chk 
??chkchk??kafkahive??
String hiveSql = "CREATE  TABLE  stream_tmp.fs_table (\n" +
"  host STRING,\n" +
"  url STRING," +
"  public_date STRING" +
") partitioned by (public_date string) " +
"stored as PARQUET " +
"TBLPROPERTIES (\n" +
"  'sink.partition-commit.delay'='0 s',\n" +
"  'sink.partition-commit.trigger'='partition-time',\n" +
"  'sink.partition-commit.policy.kind'='metastore,success-file'" +
")";
tableEnv.executeSql(hiveSql);


tableEnv.executeSql("INSERT INTO  stream_tmp.fs_table SELECT host, url, 
DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");

Re: flink解析kafka json数据

2020-07-21 文章 Dream-底限
hi
 json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃

Leonard Xu  于2020年7月21日周二 下午4:18写道:

> Hi,
> 我理解应该做不到,因为这两个format参数在format里就做的。
> json.ignore-parse-errors 是在
> format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> 这两个不能同时为ture,语义上就是互斥的。
>
> Best
> Leonard Xu
> > 在 2020年7月21日,16:08,Dream-底限  写道:
> >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
>
>


Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-21 文章 Yang Wang
configmap "flink-config-k8s-session-1" not found的报错是正常的

因为目前的实现是先创建JobManager Deployment,然后再创建ConfigMap并设置owner reference到deployment
所以你才会看到创建Pod的时候报ConfigMap还没有创建出来,这个是正常的信息,K8s会自动重试创建Pod

你现在是任务起不来吗,还是有什么其他的问题?



Best,
Yang

Yvette zhai  于2020年7月14日周二 上午10:20写道:

> 补充一下,kubernetes版本是1.18
> Yvette zhai  于2020年7月13日周一 下午9:10写道:
>
> > 1. 执行的脚本,产生的日志是:
> > 2020-07-13 21:00:25,248 INFO
> >  org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> > configuration property: jobmanager.rpc.address, localhost
> > 2020-07-13 21:00:25,251 INFO
> >  org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> > configuration property: jobmanager.rpc.port, 6123
> > 2020-07-13 21:00:25,251 INFO
> >  org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> > configuration property: jobmanager.memory.process.size, 1600m
> > 2020-07-13 21:00:25,251 INFO
> >  org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> > configuration property: taskmanager.memory.process.size, 1728m
> > 2020-07-13 21:00:25,251 INFO
> >  org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> > configuration property: taskmanager.numberOfTaskSlots, 1
> > 2020-07-13 21:00:25,251 INFO
> >  org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> > configuration property: parallelism.default, 1
> > 2020-07-13 21:00:25,252 INFO
> >  org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> > configuration property: jobmanager.execution.failover-strategy, region
> > 2020-07-13 21:00:25,344 INFO
> >  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader []
> -
> > Could not load factory due to missing dependencies.
> > 2020-07-13 21:00:26,136 INFO
> >  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> > derived from fraction jvm overhead memory (160.000mb (167772162 bytes))
> is
> > less than its min value 192.000mb (201326592 bytes), min value will be
> used
> > instead
> > 2020-07-13 21:00:26,154 INFO
> >  org.apache.flink.kubernetes.utils.KubernetesUtils[] -
> > Kubernetes deployment requires a fixed port. Configuration
> blob.server.port
> > will be set to 6124
> > 2020-07-13 21:00:26,154 INFO
> >  org.apache.flink.kubernetes.utils.KubernetesUtils[] -
> > Kubernetes deployment requires a fixed port. Configuration
> > taskmanager.rpc.port will be set to 6122
> > 2020-07-13 21:00:26,204 INFO
> >  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> > derived from fraction jvm overhead memory (160.000mb (167772162 bytes))
> is
> > less than its min value 192.000mb (201326592 bytes), min value will be
> used
> > instead
> > 2020-07-13 21:00:26,220 WARN
> >
> org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
> > [] - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop
> > Configuration ConfigMap.
> > 2020-07-13 21:00:26,220 WARN
> >
> org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
> > [] - Found 0 files in directory null/etc/hadoop, skip to create the
> Hadoop
> > Configuration ConfigMap.
> > 2020-07-13 21:00:26,958 INFO
> >  org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> > flink session cluster k8s-session-1 successfully, JobManager Web
> Interface:
> > http://172.16.5.175:8081
> >
> > 2. 查看 desrcibe 日志是:
> > MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> > "flink-config-k8s-session-1" not found
> >
> > 3. logs 日志是:
> >
> > Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath
> > $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824
> > -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log
> > -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
> > -Dlog4j.configurationFile=file:/opt/flink/conf/log4j.properties
> > org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> > 1> /opt/flink/log/jobmanager.out 2> /opt/flink/log/jobmanager.err
> >
> > 4. kubectl get cm 可以看到
> > NAME DATA   AGE
> > flink-config-k8s-session-1   3  5m45s
> >
> > 麻烦大佬帮忙看看~是不是我的语句有问题还是缺什么文件~
> > 我是直接官网下的包,没有改任何文件~
> >
> > Leonard Xu  于2020年7月13日周一 下午8:41写道:
> >
> >> Hi, zhai
> >>
> >> 可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao
> >>
> >> 祝好
> >>
> >> > 在 2020年7月13日,20:11,Yvette zhai  写道:
> >> >
> >> > 报错是MountVolume.SetUp failed for volume "flink-config-volume" :
> configmap
> >> > "flink-config-k8s-session-1" not found
> >> >
> >> >
> >> > Leonard Xu  于2020年7月13日周一 下午8:03写道:
> >> >
> >> >> Hi, zhai
> >> >>
> >> >> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
> >> >>
> >> >> Best,
> >> >> Leonard Xu
> >> >>
> >> >>> 在 2020年7月13日,19:59,Yvette zhai  写道:
> >> >>>
> >> >>> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
> >> >>> 下载的flink-1.11.0-bin-scala_2.11.tgz
> >> >>>
> >> >>> 执行命令是
> >> >>> ./bin/kubernetes-session.sh \
> >> >>>-Dkubernetes.cluster-id=k8s-session-1 \
> >> >>>   

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-21 文章 Jark Wu
这个对应关系是通过 Factory#factoryIdentifier 来决定的。
比如 DebeziumJsonFormatFactory#factoryIdentifier() 就是返回了 'debezium-json'

Best,
Jark

On Thu, 16 Jul 2020 at 22:29, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
>  谢谢,我理解了。
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Harold.Miao
> Send Time: 2020-07-16 19:33
> Receiver: user-zh
> Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
>
> private static  T findSingleInternal(
>   Class factoryClass,
>   Map properties,
>   Optional classLoader) {
>
>List tableFactories = discoverFactories(classLoader);
>List filtered = filter(tableFactories, factoryClass, properties);
>
>if (filtered.size() > 1) {
>   throw new AmbiguousTableFactoryException(
>  filtered,
>  factoryClass,
>  tableFactories,
>  properties);
>} else {
>   return filtered.get(0);
>}
> }
>
> private static List
> discoverFactories(Optional classLoader) {
>try {
>   List result = new LinkedList<>();
>   ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>   ServiceLoader
>  .load(TableFactory.class, cl)
>  .iterator()
>  .forEachRemaining(result::add);
>   return result;
>} catch (ServiceConfigurationError e) {
>   LOG.error("Could not load service provider for table factories.", e);
>   throw new TableException("Could not load service provider for
> table factories.", e);
>}
>
> }
>
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午7:04写道:
>
> >
> > 我在
> >
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> > 找到了 SPI 的配置:
> >
> > org.apache.flink.formats.json.JsonFileSystemFormatFactory
> > org.apache.flink.formats.json.JsonFormatFactory
> > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> > org.apache.flink.formats.json.canal.CanalJsonFormatFactory
> >
> > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> > 代码没找到类似的关系映射配置。
> >
> >
> > 谢谢,
> > 王磊
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
> > Sender: godfrey he
> > Send Time: 2020-07-16 16:38
> > Receiver: user-zh
> > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
> >
> > Best,
> > Godfrey
> >
> > wangl...@geekplus.com.cn  于2020年7月16日周四
> > 下午4:02写道:
> >
> > > 比如:
> > >
> > > CREATE TABLE my_table (
> > >   id BIGINT,
> > >  first_name STRING,
> > >  last_name STRING,
> > >  email STRING
> > > ) WITH (
> > >  'connector'='kafka',
> > >  'topic'='user_topic',
> > >  'properties.bootstrap.servers'='localhost:9092',
> > >  'scan.startup.mode'='earliest-offset',
> > >  'format'='debezium-json'
> > > );
> > >
> > > 最终解析 debezium-json 应该是
> > >
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > > 下面的代码
> > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> > >
> > > 谢谢,
> > > 王磊
> > >
> > >
> > > wangl...@geekplus.com.cn
> > >
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>


Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 Jark Wu
Hi,

你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.

Best,
Jark

On Thu, 16 Jul 2020 at 19:43, Harold.Miao  wrote:

> hi flink users
>
> 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
> 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
>
>
> --
>
> Best Regards,
> Harold Miao
>


Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
补充:
最终查询为

SELECT
 t.*
FROM
  kafka_source,
  LATERAL TABLE( fromJson(data) ) as t



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
如果不等待最新版本的话也可以这样

将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect

if (Objects.nonNull(str)) {
if (isArray) {
JsonNode node = objectMapper.readTree(str);
if (node.isArray()) {
Iterator nodeIterator = node.elements();
while (nodeIterator.hasNext()) {
   
collect(deserializationSchema.deserialize(nodeIterator.next().toString().getBytes()));
}
}
} else {
   
collect(deserializationSchema.deserialize(str.getBytes()));
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink table同时作为写出及输入时下游无数据

2020-07-21 文章 godfrey he
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

sql1='''Insert into g_sink_unit select alarm_id,trck_id from
kafka_source_tab'''
sql2='''Insert into g_summary_base select alarm_id,trck_id from
kafka_source_tab;'''

小学生 <201782...@qq.com> 于2020年7月21日周二 下午5:47写道:

>
> 各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。
>
>
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env,
> environment_settings=env_settings)
>
>
>
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tab (
> id VARCHAR, 
> alarm_id VARCHAR, 
> trck_id VARCHAR
>
>
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'gg', 
> 'scan.startup.mode' = 'specific-offsets',
> 'scan.startup.specific-offsets'='partition:1,offset:0',
> 'properties.bootstrap.servers' = '',
> 'format' = 'json'
> )
> """
> g_unit_sink_ddl = """
> CREATE TABLE g_sink_unit (
> alarm_id VARCHAR, 
> trck_id VARCHAR
> 
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
> 'table-name' = 'g_unit', 
> 'username' = 'root',
> 'password' = 'root',
> 'sink.buffer-flush.interval' = '1s'  
> )
> """
> g_summary_ddl = """
> CREATE TABLE g_summary_base(
> alarm_id VARCHAR, 
> trck_id VARCHAR
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
> 'table-name' = 'g_summary',
> 'username' = 'root',
> 'password' = 'root',
> 'sink.buffer-flush.interval' = '1s'
> )
> """
>
> t_env.execute_sql(kafka_source_ddl)
> t_env.execute_sql(g_unit_sink_ddl)
> t_env.execute_sql(g_summary_ddl)
>
>
> sql1='''Insert into g_sink_unitselect alarm_id,trck_id from
> kafka_source_tab'''
> sql2='''Insert into g_summary_baseselect alarm_id,trck_id from
> g_sink_unit'''
>
>
>
> stmt_set = t_env.create_statement_set()
> stmt_set.add_insert_sql(sql1)
> stmt_set.add_insert_sql(sql2)
>
>
> stmt_set.execute().get_job_client().get_job_execution_result().result()


Re: Re: flink 1.11 sql类型问题

2020-07-21 文章 Jark Wu
你是说输出的时候想带 'Z' 后缀?
如果这样的话,我觉得 json.timestamp-format.standard = 'ISO-8601' 这个参数应该能解决你的问题。

Best,
Jark

On Thu, 16 Jul 2020 at 10:02, sunfulin  wrote:

>
>
>
> hi, leonard
> 感谢回复。我在es的ddl with参数里加了这个,貌似还是报错。我再简单描述下我的场景:
> 我的es sink的ddl如下:
> create table es_sink (
>   a varchar,
>   b varchar,
>   c TIMESTAMP(9) WITH LOCAL TIME ZONE
> ) with (
>   
> )
>
>
>
> 我使用处理时间属性,将流里的proctime转成UTC格式的日期类型,作为c这个字段写入。现在能原生支持么?之前在1.10版本貌似是可以直接写的。但是到1.11写的不带时区了,导致不能兼容之前的格式。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-16 09:40:06,"Leonard Xu"  写道:
> >Hello
> >
> >json解析UTC时间是支持的,你with参数里指定下json中timestamp的类型试下,
> json.timestamp-format.standard = 'ISO-8601'
> >
> >Best
> >Leonard Xu
> >[1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> >
> >
> >> 在 2020年7月15日,23:19,sunfulin  写道:
> >>
> >> hi,
> >> 我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH
> LOCAL TIME ZONE。
> >> 在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似
> 2020-07-15T12:00:00.000Z
> >>
> >>
> >>
> >> java.lang.UnsupportedOperationException: Not support to parse type:
> TIMESTAMP(9) WITH LOCAL TIME ZONE
> >>
> >> at
> org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-15 21:24:30,"sunfulin"  写道:
> >>> hi,
> >>> 我看1.11的java.sql.Timestamp
> 对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?
> >
>


flink table??????????????????????????????

2020-07-21 文章 ??????
flink??g_unit()??kafka??g_unit??g_summary??g_line




from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings



env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
id VARCHAR,  
alarm_id VARCHAR,  
trck_id VARCHAR 


) WITH (
'connector' = 'kafka',
'topic' = 'gg',  
'scan.startup.mode' = 'specific-offsets', 
'scan.startup.specific-offsets'='partition:1,offset:0',
'properties.bootstrap.servers' = '',
'format' = 'json' 
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
alarm_id VARCHAR,  
trck_id VARCHAR 

) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_unit',  
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'  
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
alarm_id VARCHAR,  
trck_id VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_summary', 
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""

t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)


sql1='''Insert into g_sink_unitselect alarm_id,trck_id from 
kafka_source_tab'''
sql2='''Insert into g_summary_baseselect alarm_id,trck_id from 
g_sink_unit'''



stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)


stmt_set.execute().get_job_client().get_job_execution_result().result()

Re: flink1.11启动问题

2020-07-21 文章 Shuiqiang Chen
Hi,

可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源

Best,
Shuiqiang

酷酷的浑蛋  于2020年7月21日周二 下午4:37写道:

>
>
> 服了啊,这个flink1.11启动怎么净是问题啊
>
>
> 我1.7,1.8,1.9 都没有问题,到11就不行
> ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm
> 1024 -ynm sql_test ./examples/batch/WordCount.jar --input
> hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a
>
>
> 报错:
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources. at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ... 45 more Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> ... 25 more
>
>
>
> 我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),
>
>


Re: connector hive依赖冲突

2020-07-21 文章 Dream-底限
hi,
不排除依赖的话环境都起不来的哈,
java.lang.IncompatibleClassChangeError: Implementing class

at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:112)
at
org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:48)
at
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:130)
at
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
at
com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

Rui Li  于2020年7月20日周一 上午11:48写道:

> 现在具体是遇到了什么冲突呀?hive
> connector本身在依赖hive的时候确实也排除了很多传递依赖,才能正常运行UT和IT。也可以参考我们的pom来看排除了哪些依赖:
>
> https://github.com/apache/flink/blob/release-1.11.0/flink-connectors/flink-connector-hive/pom.xml
>
> On Fri, Jul 17, 2020 at 5:32 PM Dream-底限  wrote:
>
> > hi
> > 我用的是用户定义依赖,没有用捆绑依赖包,捆绑依赖包还要自己下载一次。
> >
> > Dream-底限  于2020年7月17日周五 下午5:24写道:
> >
> > > 1.9和1.10时候排除一些传递依赖后在idea和打uber jar在集群环境都可以运行,不排除传递依赖的话在idea运行不了;
> > >
> >
> 1.11现在只在本地测哪,不排除传递依赖idea运行不了,集群环境还没弄,但是我感觉在idea直接run这个功能好多人都需要,文档是不是可以改进一下
> > >
> > > Jingsong Li  于2020年7月17日周五 下午5:16写道:
> > >
> > >> 用bundle jar可以搞定吗?
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Fri, Jul 17, 2020 at 5:14 PM Dream-底限  wrote:
> > >>
> > >> > hi:
> > >> >
> > >> >
> > >>
> >
> 大佬们,下面连接hive的依赖包的哪个传递依赖导致的jar包冲突,我从1.9到1.11每次在maven按照官方文档引包都会出现依赖冲突1.9刚发布的时候对下面的引包有做依赖排除,后来文档改了
> > >> >
> > >> > // Flink's Hive connector.Contains flink-hadoop-compatibility and
> > >> > flink-orc jars
> > >> >flink-connector-hive_2.11-1.11.0.jar
> > >> >// Hive dependencies
> > >> >hive-exec-2.3.4.jar
> > >> >
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


????: flink table??????????????????????????????

2020-07-21 文章 chengyanan1...@foxmail.com
??

sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from 
kafka_source_tab'''
sql2='''Insert into g_summary_ddl select alarm_id,trck_id from 
g_unit_sink_ddl'''

g_sink_unit ?? g_summary_base



chengyanan1...@foxmail.com
 
 ??
?? 2020-07-21 16:38
 user-zh
?? flink table??
flink??g_unit()??kafka??g_unit??g_summary??g_line
 
 
 
 
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 
 
 
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
 
 
 
kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
id VARCHAR,  
alarm_id VARCHAR,  
trck_id VARCHAR 
 
 
) WITH (
'connector' = 'kafka',
'topic' = 'gg',  
'scan.startup.mode' = 'specific-offsets', 
'scan.startup.specific-offsets'='partition:1,offset:0',
'properties.bootstrap.servers' = '',
'format' = 'json' 
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
alarm_id VARCHAR,  
trck_id VARCHAR 

) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_unit',  
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'  
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
alarm_id VARCHAR,  
trck_id VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_summary', 
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""
 
t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)
 
 
sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from 
kafka_source_tab'''
sql2='''Insert into g_summary_ddl select alarm_id,trck_id from 
g_unit_sink_ddl'''
 
 
 
stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)
 
 
stmt_set.execute().get_job_client().get_job_execution_result().result()


flink table??????????????????????????????

2020-07-21 文章 ??????
flink??g_unit()??kafka??g_unit??g_summary??g_line




from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings



env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
id VARCHAR, 
alarm_id VARCHAR, 
trck_id VARCHAR


) WITH (
'connector' = 'kafka',
'topic' = 'gg', 
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets'='partition:1,offset:0',
'properties.bootstrap.servers' = '',
'format' = 'json'
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
alarm_id VARCHAR, 
trck_id VARCHAR

) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_unit', 
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'  
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
alarm_id VARCHAR, 
trck_id VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_summary',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""

t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)


sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from 
kafka_source_tab'''
sql2='''Insert into g_summary_ddl select alarm_id,trck_id from 
g_unit_sink_ddl'''



stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)


stmt_set.execute().get_job_client().get_job_execution_result().result()

flink table??????????????????????????????

2020-07-21 文章 ??????
flink??g_unit()??kafka??g_unit??g_summary??g_line




from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings



env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
id VARCHAR,  
alarm_id VARCHAR,  
trck_id VARCHAR 


) WITH (
'connector' = 'kafka',
'topic' = 'gg',  
'scan.startup.mode' = 'specific-offsets', 
'scan.startup.specific-offsets'='partition:1,offset:0',
'properties.bootstrap.servers' = '',
'format' = 'json' 
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
alarm_id VARCHAR,  
trck_id VARCHAR 

) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_unit',  
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'  
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
alarm_id VARCHAR,  
trck_id VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_summary', 
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""

t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)


sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from 
kafka_source_tab'''
sql2='''Insert into g_summary_ddl select alarm_id,trck_id from 
g_unit_sink_ddl'''



stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)


stmt_set.execute().get_job_client().get_job_execution_result().result()

flink1.11启动问题

2020-07-21 文章 酷酷的浑蛋


服了啊,这个flink1.11启动怎么净是问题啊


我1.7,1.8,1.9 都没有问题,到11就不行
./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm 1024 
-ynm sql_test ./examples/batch/WordCount.jar --input 
hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a


报错:
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. Please make 
sure that the cluster has enough resources. at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
 ... 45 more Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.TimeoutException at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) 
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 ... 25 more


我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),



????: ?????? pyflink1.11.0window

2020-07-21 文章 chengyanan1...@foxmail.com
Hi??
kafkaTableSink??AppendStreamTableSinkfinal_result2source1??group
 by??final_result??mysqljoingroup by??StreamTable 
RetractStreamTableSink??
??AppendStreamTableSink??RetractStreamTableSink


 ??  ?? 

??



chengyanan1...@foxmail.com
 
 ??
?? 2020-07-21 10:23
 user-zh
?? ?? pyflink1.11.0window
??
  
??pyflinkdemo??
 
 

 
 
----
??: 
   "user-zh"



Re: flink解析kafka json数据

2020-07-21 文章 Leonard Xu
Hi,
我理解应该做不到,因为这两个format参数在format里就做的。
json.ignore-parse-errors 是在 
format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 
是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
这两个不能同时为ture,语义上就是互斥的。

Best
Leonard Xu
> 在 2020年7月21日,16:08,Dream-底限  写道:
> 
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储



flink解析kafka json数据

2020-07-21 文章 Dream-底限
hi
我这面在使用sql api解析kafka
json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors
son.fail-on-missing-field


Re: flink1.11 pyflink stream job 退出

2020-07-21 文章 Xingbo Huang
是的,execute是1.10及以前使用的,execute_sql是1.11之后推荐使用的

Best,
Xingbo

lgs <9925...@qq.com> 于2020年7月21日周二 下午3:57写道:

> 谢谢。加上后就可以了。
>
> 改成原来的sql_update然后st_env.execute("job")好像也可以。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11 pyflink stream job 退出

2020-07-21 文章 lgs
谢谢。加上后就可以了。

改成原来的sql_update然后st_env.execute("job")好像也可以。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 pyflink stream job 退出

2020-07-21 文章 Xingbo Huang
Hi,
execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上
sql_result.get_job_client().get_job_execution_result().result()
对此我已经创建了JIRA[1]

[1] https://issues.apache.org/jira/browse/FLINK-18598

Best,
Xingbo

lgs <9925...@qq.com> 于2020年7月21日周二 下午3:35写道:

> python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
> 代码如下,使用了MATCH_RECOGNIZE:
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> b_s_settings =
>
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> st_env = StreamTableEnvironment.create(s_env,
> environment_settings=b_s_settings)
> configuration = st_env.get_config().get_configuration()
> configuration.set_string("taskmanager.memory.task.off-heap.size",
> "500m")
>
> s_env.set_parallelism(1)
>
> kafka_source = """CREATE TABLE source (
>  flow_name STRING,
>  flow_id STRING,
>  component STRING,
>  filename STRING,
>  event_time TIMESTAMP(3),
>  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'cep',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'format' = 'json',
>  'scan.startup.mode' = 'latest-offset'
> )"""
>
>
>
> postgres_sink = """
> CREATE TABLE cep_result (
> `filename` STRING,
> `start_tstamp`  TIMESTAMP(3),
> `end_tstamp`   TIMESTAMP(3)
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
> 'connector.table' = 'cep_result',
> 'connector.driver' = 'org.postgresql.Driver',
> 'connector.username' = 'postgres',
> 'connector.password' = 'my_password',
> 'connector.write.flush.max-rows' = '1'
> )
> """
>
> st_env.sql_update(kafka_source)
> st_env.sql_update(postgres_sink)
>
> postgres_sink_sql = '''
> INSERT INTO cep_result
> SELECT *
> FROM source
> MATCH_RECOGNIZE (
> PARTITION BY filename
> ORDER BY event_time
> MEASURES
> (A.event_time) AS start_tstamp,
> (D.event_time) AS end_tstamp
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A B C D)
> DEFINE
> A AS component = 'XXX',
> B AS component = 'YYY',
> C AS component = 'ZZZ',
> D AS component = 'WWW'
> ) MR
> '''
>
> sql_result = st_env.execute_sql(postgres_sink_sql)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.11 pyflink stream job 退出

2020-07-21 文章 lgs
python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
代码如下,使用了MATCH_RECOGNIZE:

s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
st_env = StreamTableEnvironment.create(s_env,
environment_settings=b_s_settings)
configuration = st_env.get_config().get_configuration()
configuration.set_string("taskmanager.memory.task.off-heap.size",
"500m")

s_env.set_parallelism(1)

kafka_source = """CREATE TABLE source (
 flow_name STRING,
 flow_id STRING,
 component STRING,
 filename STRING,
 event_time TIMESTAMP(3),
 WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'cep',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)"""



postgres_sink = """
CREATE TABLE cep_result (
`filename` STRING,
`start_tstamp`  TIMESTAMP(3),
`end_tstamp`   TIMESTAMP(3)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
'connector.table' = 'cep_result',
'connector.driver' = 'org.postgresql.Driver',
'connector.username' = 'postgres',
'connector.password' = 'my_password',
'connector.write.flush.max-rows' = '1'
)
"""

st_env.sql_update(kafka_source)
st_env.sql_update(postgres_sink)

postgres_sink_sql = '''
INSERT INTO cep_result
SELECT *
FROM source
MATCH_RECOGNIZE (
PARTITION BY filename
ORDER BY event_time
MEASURES
(A.event_time) AS start_tstamp,
(D.event_time) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B C D)
DEFINE
A AS component = 'XXX',
B AS component = 'YYY',
C AS component = 'ZZZ',
D AS component = 'WWW'
) MR
'''

sql_result = st_env.execute_sql(postgres_sink_sql)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-21 文章 godfrey he
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
 这个邮件里提到了类似的问题。

https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
“data”,“mysqlType”等格式不确定的字段定义为String类型,
下游通过udf自己再解析对应的json


Best,
Godfrey

jindy_liu <286729...@qq.com> 于2020年7月21日周二 下午12:37写道:

> 例如:
>
> mysql表:
> CREATE TABLE `test` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   `time` datetime NOT NULL,
>   `status` int(11) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> CREATE TABLE `status` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> kafka中数据:
> // 表test 中insert事件
> {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
>
> 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
>
> //表status 中的事件
>
> {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
>
> 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
> 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.11 submit job timed out

2020-07-21 文章 SmileSmile
Hi,Congxian

因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be resolved,jm失联,作业提交失败。 
将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。

在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。


是否有其他排查思路?

Best!




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/16/2020 13:17, Congxian Qiu wrote:
Hi
  如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在 Yarn
环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。

Best,
Congxian


SmileSmile  于2020年7月15日周三 下午5:20写道:

> Hi Roc
>
> 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> On 07/15/2020 17:16, Roc Marshal wrote:
> Hi,SmileSmile.
> 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> 希望这对你有帮助。
>
>
> 祝好。
> Roc Marshal
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> >
> >Hi
> >
> >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time
> out,作业提交失败。web ui也会卡主无响应。
> >
> >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> >
> >
> >部分日志如下:
> >
> >2020-07-15 16:58:46,460 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.32.160.7, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >2020-07-15 16:58:46,460 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.44.224.7, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >2020-07-15 16:58:46,461 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.40.32.9, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >
> >2020-07-15 16:59:10,236 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The
> heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
> >2020-07-15 16:59:10,236 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager 
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for job
> e1554c737e37ed79688a15c746b6e9ef from the resource manager.
> >
> >
> >how to deal with ?
> >
> >
> >beset !
> >
> >| |
> >a511955993
> >|
> >|
> >邮箱:a511955...@163.com
> >|
> >
> >签名由 网易邮箱大师 定制
>


回复: (无主题)

2020-07-21 文章 罗显宴
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
val result = num.timeWindowAll(Time.seconds(20))
//.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
.process(new 
ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {

private var itemState: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getMapState(new 
MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
  }

override def process(context: Context, elements: Iterable[IncreaseNumPerHour], 
out: Collector[IncreasePerHour]): Unit = {
var timestamp:Long = 0L
elements.foreach(kv => {
itemState.put(kv.category, 1)
timestamp = (kv.timestamp/2000+1)*2000
})
import scala.collection.JavaConversions._
out.collect(IncreasePerHour(new Timestamp( timestamp - 1 
).toString,itemState.keys().size))
  }
})


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 14:15,罗显宴<15927482...@163.com> 写道:


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制




回复: (无主题)

2020-07-21 文章 罗显宴


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制