Re: UDF:Type is not supported: ANY

2020-08-05 Thread Benchao Li
看起来写法没啥问题,我们就是这么用的。
你用的是哪个版本的Flink?然后是怎么注册的UDF呢?

zilong xiao  于2020年8月6日周四 下午12:06写道:

> 我这么写过,貌似不行,下面是我的代码,可否看下是否可行?
>
> public class Json2Map extends ScalarFunction {
>
>private static final Logger LOG =
> LoggerFactory.getLogger(Json2Map.class);
>
>private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>
>public Json2Map(){}
>
>public Map eval(String param) {
>   Map result = new HashMap<>();
>   try {
>  if (param == null) {
> return result;
>  }
>  result = OBJECT_MAPPER.readValue(param, Map.class);
>  // 遍历toString 貌似也不行
>  //for(Object obj : tmp.keySet()){
>  // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj)));
>  //}
>  LOG.info("result is: {}", result);
>   } catch (JsonProcessingException e){
>  LOG.error("failed to convert json to map, param is: {}", param,
> e);
>   }
>   return result;
>}
>
>
>@Override
>public TypeInformation>
> getResultType(Class[] signature) {
>   return Types.MAP(Types.STRING, Types.STRING);
>}
>
> }
>
>
> Benchao Li  于2020年8月6日周四 上午11:04写道:
>
> > 可以直接返回Map类型呀,比如:
> >
> > public class String2Map extends ScalarFunction {
> >
> >public Map eval(String param) throws Exception {
> >   Map map = new HashMap<>();
> >   // ...
> >   return map;
> >}
> >
> >@Override
> >public TypeInformation getResultType(Class[] signature) {
> >   return Types.MAP(Types.STRING, Types.STRING);
> >}
> >
> > }
> >
> >
> > zilong xiao  于2020年8月6日周四 上午10:24写道:
> >
> > >
> > >
> >
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
> > >
> > > Benchao Li  于2020年8月5日周三 下午11:49写道:
> > >
> > > > Hi zilong,
> > > >
> > > > SQL里面的ARRAY类型,对应的legacy
> > > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > > > 其他类型的type information会被当做any类型来处理。
> > > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-18417
> > > >
> > > > zilong xiao  于2020年8月3日周一 下午8:23写道:
> > > >
> > > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > > > >
> > > > > godfrey he  于2020年8月3日周一 下午7:50写道:
> > > > >
> > > > > > 你把Map换为Map试试
> > > > > >
> > > > > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > > > > >
> > > > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > > > >
> > > > > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > > > > >
> > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > > >
> Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > > > `Type
> > > > > is
> > > > > > > not
> > > > > > > > supported:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > > > >
> > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > > > Json2Map
> > > > > > > > udf应该怎么操作呢?求前辈指导
> > > > > > > >
> > > > > > > > udfd代码如下:
> > > > > > > >
> > > > > > > > public class Json2List extends ScalarFunction {
> > > > > > > >
> > > > > > > >private static final Logger LOG =
> > > > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > > > >
> > > > > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > > > > ObjectMapper()
> > > > > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES,
> true)
> > > > > > > >
> >  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > > > true) ;
> > > > > > > >
> > > > > > > >public Json2List(){}
> > > > > > > >
> > > > > > > >public List eval(String param) {
> > > > > > > >   List result = new ArrayList<>();
> > > > > > > >   try {
> > > > > > > >  List> list =
> > > > > > OBJECT_MAPPER.readValue(param,
> > > > > > > List.class);
> > > > > > > >  for(Map map : list){
> > > > > > > >
>  result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > > > >  }
> > > > > > > >  return result;
> > > > > > > >   } catch (JsonProcessingException e){
> > > > > > > >  LOG.error("failed to convert json to array, param
> is:
> > > {}",
> > > > > > > param, e);
> > > > > > > >   }
> > > > > > > >   return result;
> > > > > > > >}
> > > > > > > >
> > > > > > > >
> > > > > > > >@Override
> > > > > > > >public TypeInformation>
> > getResultType(Class[]
> > > > > > > signature) {
> > > > > > > >   return Types.LIST(Types.STRING);
> > > > > > > >}
> > > > > > > >
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 Thread Congxian Qiu
Hi
我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件 的发送情况呢?

Best,
Congxian


op <520075...@qq.com> 于2020年8月6日周四 上午10:36写道:

>感谢 ,  截图和配置在附件里面
>   我试试配置  RocksDB StateBackend
>
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2020年8月5日(星期三) 下午5:43
> *收件人:* "user-zh";
> *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
> Hi
>   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
>
>   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及 HDFS
> 上 checkpoint 目录的截图
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
>
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:
>
> > 你好,ttl配置是
> > val settings =
> EnvironmentSettings.newInstance().inStreamingMode().build()
> > val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> > val tConfig = tableEnv.getConfig
> > tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))
> >
> >
> >   1)目前是有3个任务都是这种情况
> >   2)目前集群没有RocksDB环境
> > 谢谢
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > qcx978132...@gmail.com;
> > 发送时间:2020年8月5日(星期三) 下午3:30
> > 收件人:"user-zh" >
> > 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> >
> >
> > Hi op
> >  这个情况比较奇怪。我想确认下:
> >  1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
> >  2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
> >
> >  另外,你 TTL 其他的配置是怎么设置的呢?
> >
> > 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> > Best,
> > Congxian
> >
> >
> > op <520075...@qq.com 于2020年8月5日周三 下午2:46写道:
> >
> >  nbsp; nbsp;
> > 
> >
> 你好,我使用的是FsStateBackendnbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> >  nbsp;
> > nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> >  nbsp; nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
> >  by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> >  nbsp; nbsp;运行5天能满足清理条件
> > 
> > 
> > 
> > 
> >  -- 原始邮件 --
> >  发件人:
> >
> 
> > "user-zh"
> >
> 
> > <
> >  qcx978132...@gmail.comgt;;
> >  发送时间:nbsp;2020年8月3日(星期一) 下午5:50
> >  收件人:nbsp;"user-zh" > 
> >  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> > 
> > 
> > 
> >  Hi
> >  nbsp;nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> >  目录的数据量看,有增长,后续基本持平。现在
> >  Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> > checkpoint
> >  之间,数据改动很多的话,这个值会变大
> > 
> >  [1]
> > 
> > 
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> > 
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> >;
> > Best,
> >  Congxian
> > 
> > 
> >  op <520075...@qq.comgt; 于2020年8月3日周一 下午2:18写道:
> > 
> >  gt; amp;nbsp; amp;nbsp;
> >  gt;
> > 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> >  gt; 逻辑是按照 事件day 和 id 进行groupby
> >  gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> >  gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> >  gt; Time.minutes(1440+10))
> >  gt;
> >  gt;
> >  gt;
> >  gt;
> >  gt;
> > --amp;nbsp;原始邮件amp;nbsp;--
> >  gt; 发件人:
> > 
> >
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> >  nbsp; "user-zh"
> > 
> >
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> >  nbsp; <
> >  gt; 384939...@qq.comamp;gt;;
> >  gt; 发送时间:amp;nbsp;2020年8月3日(星期一) 中午1:50
> >  gt; 收件人:amp;nbsp;"user-zh" > amp;gt;;
> >  gt;
> >  gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和
> 时间窗口
> > 操作后 状态越来越大
> >  gt;
> >  gt;
> >  gt;
> >  gt; hi,您好:
> >  gt; 我改回增量模式重新收集了一些数据:
> >  gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
> >  gt; 2、checkpoint是interval设置的是5秒
> >  gt; 3、目前这个作业是每分钟一个窗口
> >  gt; 4、并行度设置的1,使用on-yarn模式
> >  gt;
> >  gt; 刚启动的时候,如下:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
> >  gt;
> >  gt; 18分钟后,如下:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt;
> >  gt;
> >  gt; checkpoints设置:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt;
> >  gt;
> >  gt; hdfs上面大小:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt;
> >  gt;
> >  gt; 页面上看到的大小:
> >  gt; <
> > 
> >
> 

Re: UDF:Type is not supported: ANY

2020-08-05 Thread zilong xiao
我这么写过,貌似不行,下面是我的代码,可否看下是否可行?

public class Json2Map extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2Map.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

   public Json2Map(){}

   public Map eval(String param) {
  Map result = new HashMap<>();
  try {
 if (param == null) {
return result;
 }
 result = OBJECT_MAPPER.readValue(param, Map.class);
 // 遍历toString 貌似也不行
 //for(Object obj : tmp.keySet()){
 // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj)));
 //}
 LOG.info("result is: {}", result);
  } catch (JsonProcessingException e){
 LOG.error("failed to convert json to map, param is: {}", param, e);
  }
  return result;
   }


   @Override
   public TypeInformation>
getResultType(Class[] signature) {
  return Types.MAP(Types.STRING, Types.STRING);
   }

}


Benchao Li  于2020年8月6日周四 上午11:04写道:

> 可以直接返回Map类型呀,比如:
>
> public class String2Map extends ScalarFunction {
>
>public Map eval(String param) throws Exception {
>   Map map = new HashMap<>();
>   // ...
>   return map;
>}
>
>@Override
>public TypeInformation getResultType(Class[] signature) {
>   return Types.MAP(Types.STRING, Types.STRING);
>}
>
> }
>
>
> zilong xiao  于2020年8月6日周四 上午10:24写道:
>
> >
> >
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
> >
> > Benchao Li  于2020年8月5日周三 下午11:49写道:
> >
> > > Hi zilong,
> > >
> > > SQL里面的ARRAY类型,对应的legacy
> > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > > 其他类型的type information会被当做any类型来处理。
> > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18417
> > >
> > > zilong xiao  于2020年8月3日周一 下午8:23写道:
> > >
> > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > > >
> > > > godfrey he  于2020年8月3日周一 下午7:50写道:
> > > >
> > > > > 你把Map换为Map试试
> > > > >
> > > > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > > > >
> > > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > > >
> > > > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > > > >
> > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > > `Type
> > > > is
> > > > > > not
> > > > > > > supported:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > > >
> > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > > Json2Map
> > > > > > > udf应该怎么操作呢?求前辈指导
> > > > > > >
> > > > > > > udfd代码如下:
> > > > > > >
> > > > > > > public class Json2List extends ScalarFunction {
> > > > > > >
> > > > > > >private static final Logger LOG =
> > > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > > >
> > > > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > > > ObjectMapper()
> > > > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > > > >
>  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > > true) ;
> > > > > > >
> > > > > > >public Json2List(){}
> > > > > > >
> > > > > > >public List eval(String param) {
> > > > > > >   List result = new ArrayList<>();
> > > > > > >   try {
> > > > > > >  List> list =
> > > > > OBJECT_MAPPER.readValue(param,
> > > > > > List.class);
> > > > > > >  for(Map map : list){
> > > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > > >  }
> > > > > > >  return result;
> > > > > > >   } catch (JsonProcessingException e){
> > > > > > >  LOG.error("failed to convert json to array, param is:
> > {}",
> > > > > > param, e);
> > > > > > >   }
> > > > > > >   return result;
> > > > > > >}
> > > > > > >
> > > > > > >
> > > > > > >@Override
> > > > > > >public TypeInformation>
> getResultType(Class[]
> > > > > > signature) {
> > > > > > >   return Types.LIST(Types.STRING);
> > > > > > >}
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


flink1.11 es connector

2020-08-05 Thread Dream-底限
hi
我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)


?????? ????????flink??????????????????

2020-08-05 Thread ????????
cep
cepGroupPattern??within??wait??
cepcep??cep
?? cepflink1.7?? 
https://developer.aliyun.com/article/738451





----
??: 
   "user-zh"

https://blog.csdn.net/zhangjun5965/article/details/106573528

samuel@ubtrobot.com 

Re: UDF:Type is not supported: ANY

2020-08-05 Thread Benchao Li
可以直接返回Map类型呀,比如:

public class String2Map extends ScalarFunction {

   public Map eval(String param) throws Exception {
  Map map = new HashMap<>();
  // ...
  return map;
   }

   @Override
   public TypeInformation getResultType(Class[] signature) {
  return Types.MAP(Types.STRING, Types.STRING);
   }

}


zilong xiao  于2020年8月6日周四 上午10:24写道:

>
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
>
> Benchao Li  于2020年8月5日周三 下午11:49写道:
>
> > Hi zilong,
> >
> > SQL里面的ARRAY类型,对应的legacy
> type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > 其他类型的type information会被当做any类型来处理。
> > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18417
> >
> > zilong xiao  于2020年8月3日周一 下午8:23写道:
> >
> > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > >
> > > godfrey he  于2020年8月3日周一 下午7:50写道:
> > >
> > > > 你把Map换为Map试试
> > > >
> > > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > > >
> > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > >
> > > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > > >
> > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > `Type
> > > is
> > > > > not
> > > > > > supported:
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > >
> > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > Json2Map
> > > > > > udf应该怎么操作呢?求前辈指导
> > > > > >
> > > > > > udfd代码如下:
> > > > > >
> > > > > > public class Json2List extends ScalarFunction {
> > > > > >
> > > > > >private static final Logger LOG =
> > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > >
> > > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > > ObjectMapper()
> > > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > true) ;
> > > > > >
> > > > > >public Json2List(){}
> > > > > >
> > > > > >public List eval(String param) {
> > > > > >   List result = new ArrayList<>();
> > > > > >   try {
> > > > > >  List> list =
> > > > OBJECT_MAPPER.readValue(param,
> > > > > List.class);
> > > > > >  for(Map map : list){
> > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > >  }
> > > > > >  return result;
> > > > > >   } catch (JsonProcessingException e){
> > > > > >  LOG.error("failed to convert json to array, param is:
> {}",
> > > > > param, e);
> > > > > >   }
> > > > > >   return result;
> > > > > >}
> > > > > >
> > > > > >
> > > > > >@Override
> > > > > >public TypeInformation> getResultType(Class[]
> > > > > signature) {
> > > > > >   return Types.LIST(Types.STRING);
> > > > > >}
> > > > > >
> > > > > > }
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: 请教:用flink实现实时告警的功能

2020-08-05 Thread Jun Zhang
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置

https://blog.csdn.net/zhangjun5965/article/details/106573528

samuel@ubtrobot.com  于2020年8月6日周四 上午10:26写道:

> 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
> 告警有分两部分:
>一是告警规则的设置,数据存放在mysql,存储的格式是json
> {"times":5}  ---就是事件发生大于5次就发出告警;
> {"temperature": 80} ---就是温度大于80就告警;
>二是告警实现
>   1)上报的数据写入到kafka
>   2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
> 现在遇到的问题是:
> 1. 当规则变更时,如何及时生效?
> 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
> 3.这一功能有最佳实践吗?
>
> 希望哪位解答一下,谢谢!
>
>
>
>


?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 Thread op
  ?? 
 ?? RocksDB StateBackend



----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend

Best,
Congxian


op <520075...@qq.com ??2020??8??5?? 4:03??

 ??ttl??
 val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
 val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
 val tConfig = tableEnv.getConfig
 tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))


 nbsp; nbsp; 1)3??
 nbsp; nbsp; 2)RocksDB
 
 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 qcx978132...@gmail.comgt;;
 :nbsp;2020??8??5??(??) 3:30
 ??:nbsp;"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
 gt
 


????????flink??????????????????

2020-08-05 Thread samuel....@ubtrobot.com
flink 
,??


   ??mysql??json
{"times":5}  ---5??
{"temperature": 80} ---80
   
  1)kafka
  
2)flinkkafka??


??
1. 
2.??flink CEP??
3.??


   

 


Re: UDF:Type is not supported: ANY

2020-08-05 Thread zilong xiao
感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值

Benchao Li  于2020年8月5日周三 下午11:49写道:

> Hi zilong,
>
> SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> 其他类型的type information会被当做any类型来处理。
> 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18417
>
> zilong xiao  于2020年8月3日周一 下午8:23写道:
>
> > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> >
> > godfrey he  于2020年8月3日周一 下午7:50写道:
> >
> > > 你把Map换为Map试试
> > >
> > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > >
> > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > >
> > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > >
> > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> `Type
> > is
> > > > not
> > > > > supported:
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > >
> STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > Json2Map
> > > > > udf应该怎么操作呢?求前辈指导
> > > > >
> > > > > udfd代码如下:
> > > > >
> > > > > public class Json2List extends ScalarFunction {
> > > > >
> > > > >private static final Logger LOG =
> > > > LoggerFactory.getLogger(Json2List.class);
> > > > >
> > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > ObjectMapper()
> > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > true) ;
> > > > >
> > > > >public Json2List(){}
> > > > >
> > > > >public List eval(String param) {
> > > > >   List result = new ArrayList<>();
> > > > >   try {
> > > > >  List> list =
> > > OBJECT_MAPPER.readValue(param,
> > > > List.class);
> > > > >  for(Map map : list){
> > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > >  }
> > > > >  return result;
> > > > >   } catch (JsonProcessingException e){
> > > > >  LOG.error("failed to convert json to array, param is: {}",
> > > > param, e);
> > > > >   }
> > > > >   return result;
> > > > >}
> > > > >
> > > > >
> > > > >@Override
> > > > >public TypeInformation> getResultType(Class[]
> > > > signature) {
> > > > >   return Types.LIST(Types.STRING);
> > > > >}
> > > > >
> > > > > }
> > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Re: Re: FLINK SQL view的数据复用问题

2020-08-05 Thread godfrey he
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。

kandy.wang  于2020年8月5日周三 下午10:43写道:

>
>
>
>
>
>
> @ godfrey
> 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-04 19:36:56,"godfrey he"  写道:
> >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
> >
> >kandy.wang  于2020年8月4日周二 下午6:21写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> @ godfrey
> >> thanks。刚试了一下,source -> Deduplicate  ->
> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
> >>
> >>
> >> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >> >
> >> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >> >
> >> >> FLINK SQL view相关问题:
> >> >> create view order_source
> >> >>
> >> >> as
> >> >>
> >> >> select order_id, order_goods_id, user_id,...
> >> >>
> >> >> from (
> >> >>
> >> >> ..  proctime,row_number() over(partition by order_id,
> >> >> order_goods_id order by proctime desc) as rownum
> >> >>
> >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> >> properties.group.id'='flink_etl_kafka_hbase',
> >> >> 'scan.startup.mode'='latest-offset') */
> >> >>
> >> >> ) where  rownum = 1 and  price > 0;
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT),)
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_date as rowkey,
> >> >>
> >> >> sum(amount) as saleN,
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_date
> >> >>
> >> >> );
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT))
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_hour as rowkey,sum(amount) as saleN,
> >> >>
> >> >>
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_hour
> >> >>
> >> >> );
> >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer
> group。
> >> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  ->
> sink
> >> >> 2
> >> >>
> >> >>
> >> >> 本意是想通过view  order_source
> >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >> >>
> >> >>
> >>
>


?????? arm??centos7??????pyflink

2020-08-05 Thread ????
??


??






----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/python_shell.html

Best,
Xingbo

 <1129656...@qq.com ??2020??8??6?? 9:11??

 Hi:
 nbsp; nbsp; mvn clean package
 
-DskipTestflink-1.11.1-src.tgzflink-1.11.1/build-target/bin/pyflink-shell.shpyflink


 --nbsp;nbsp;--
 ??:

 ""

 <
 1129656...@qq.comgt;;
 :nbsp;2020??8??4??(??) 10:19
 ??:nbsp;"user-zh"

Re: arm,centos7下部署pyflink

2020-08-05 Thread Xingbo Huang
Hi,
肯定不是的,这个只是python
shell的启动脚本。你想想看你mvn编译的只是java的代码,里面都没有pyflink的python代码。你写的python job那些Import
的pyflink的包python都认识不了,肯定跑不起来呀。你如果想玩python shell,可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/python_shell.html

Best,
Xingbo

琴师 <1129656...@qq.com> 于2020年8月6日周四 上午9:11写道:

> Hi:
>   我用mvn clean package
> -DskipTest完整的编译了flink-1.11.1-src.tgz,我看到flink-1.11.1/build-target/bin/pyflink-shell.sh,请问这个是做什么用的,是不是用这个就可以相当于pyflink了?
>
>
> --原始邮件--
> 发件人:
>   "琴师"
>   <
> 1129656...@qq.com;
> 发送时间:2020年8月4日(星期二) 上午10:19
> 收件人:"user-zh"
> 主题:回复: arm,centos7下部署pyflink
>
>
>
> 确实,我尝试了编译pyarrow,但是没有完成,而且不止这一个第三方包。
> 真是遗憾。
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> hxbks...@gmail.com;
> 发送时间:2020年8月4日(星期二) 上午10:09
> 收件人:"user-zh"
> 主题:Re: arm,centos7下部署pyflink
>
>
>
> Hello,
>
> 我们pyflink会依赖pyarrow的包,而pyarrow并没有发布对应在arm下的wheel包,所以,你安装的话,会去下载pyarrow的源码包,然后尝试去编译源码包,所以出现你报错信息中的cmake失败的内容。
>
> 现在可行的方式是你得尝试自己手动编译安装pyarrow,不过pyarrow并没有在arm架构下做测试,而且在高版本的pyarrow包甚至不发布源码包了,所以我很怀疑你能否顺利编译安装成功pyarrow。
>
> Best,
> Xingbo
>
> 琴师 <1129656...@qq.com 于2020年8月4日周二 上午9:57写道:
>
>  你好:
>  nbsp; nbsp;
> 
> 我尝试在arm架构下centos系统内部署pyflink,python版本是3.5.9(3.7.1也尝试过),均不能完成部署,卡在pyarrow这一步,请问,这是系统不支持,还是我部署的问题?如果是我部署的问题,请问有解决方案么?
>  nbsp;ERROR: Command errored out with exit status 1:
>  nbsp; nbsp;command: /usr/local/python3/bin/python3.5
> 
> /usr/local/python3/lib/python3.5/site-packages/pip/_vendor/pep517/_in_process.py
>  build_wheel /tmp/tmpfda_qhew
>  nbsp; nbsp; nbsp; nbsp;cwd:
> /tmp/pip-install-r4b5m7u0/pyarrow
>  nbsp; Complete output (428 lines):
>  nbsp; running bdist_wheel
>  nbsp; running build
>  nbsp; running build_py
>  nbsp; creating build
>  nbsp; creating build/lib.linux-aarch64-3.5
>  nbsp; creating build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/jvm.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/benchmark.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/_generated_version.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/plasma.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/hdfs.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/cuda.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/orc.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/filesystem.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/parquet.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/flight.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/types.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/ipc.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/fs.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/compat.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/json.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/__init__.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/csv.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/util.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/pandas_compat.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/serialization.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/feather.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; creating build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_jvm.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_deprecations.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_pandas.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_feather.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_gandiva.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_fs.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_json.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_ipc.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_plasma_tf_op.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_misc.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_scalars.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/conftest.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/strategies.py -gt;
>  

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread jincheng sun
Hi David, Thank you for sharing the problems with the current document, and
I agree with you as I also got the same feedback from Chinese users. I am
often contacted by users to ask questions such as whether PyFlink supports
"Java UDF" and whether PyFlink supports "xxxConnector". The root cause of
these problems is that our existing documents are based on Java users (text
and API mixed part). Since Python is newly added from 1.9, many document
information is not friendly to Python users. They don't want to look for
Python content in unfamiliar Java documents. Just yesterday, there were
complaints from Chinese users about where is all the document entries of
 Python API. So, have a centralized entry and clear document structure,
which is the urgent demand of Python users. The original intention of FLIP
is do our best to solve these user pain points.

Hi Xingbo and Wei Thank you for sharing PySpark's status on document
optimization. You're right. PySpark already has a lot of Python user
groups. They also find that Python user community is an important position
for multilingual support. The centralization and unification of Python
document content will reduce the learning cost of Python users, and good
document structure and content will also reduce the Q & A burden of the
community, It's a once and for all job.

Hi Seth, I wonder if your concerns have been resolved through the previous
discussion?

Anyway, the principle of FLIP is that in python document should only
include Python specific content, instead of making a copy of the Java
content. And would be great to have you to join in the improvement for
PyFlink (Both PRs and Review PRs).

Best,
Jincheng


Wei Zhong  于2020年8月5日周三 下午5:46写道:

> Hi Xingbo,
>
> Thanks for your information.
>
> I think the PySpark's documentation redesigning deserves our attention. It
> seems that the Spark community has also begun to treat the user experience
> of Python documentation more seriously. We can continue to pay attention to
> the discussion and progress of the redesigning in the Spark community. It
> is so similar to our working that there should be some ideas worthy for us.
>
> Best,
> Wei
>
>
> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>
> Hi,
>
> I found that the spark community is also working on redesigning pyspark
> documentation[1] recently. Maybe we can compare the difference between our
> document structure and its document structure.
>
> [1] https://issues.apache.org/jira/browse/SPARK-31851
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>
> Best,
> Xingbo
>
> David Anderson  于2020年8月5日周三 上午3:17写道:
>
>> I'm delighted to see energy going into improving the documentation.
>>
>> With the current documentation, I get a lot of questions that I believe
>> reflect two fundamental problems with what we currently provide:
>>
>> (1) We have a lot of contextual information in our heads about how Flink
>> works, and we are able to use that knowledge to make reasonable inferences
>> about how things (probably) work in cases we aren't so familiar with. For
>> example, I get a lot of questions of the form "If I use  will
>> I still have exactly once guarantees?" The answer is always yes, but they
>> continue to have doubts because we have failed to clearly communicate this
>> fundamental, underlying principle.
>>
>> This specific example about fault tolerance applies across all of the
>> Flink docs, but the general idea can also be applied to the Table/SQL and
>> PyFlink docs. The guiding principles underlying these APIs should be
>> written down in one easy-to-find place.
>>
>> (2) The other kind of question I get a lot is "Can I do  with ?"
>> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
>> very difficult to answer because it is frequently the case that one has to
>> reason about why a given feature doesn't seem to appear in the
>> documentation. It could be that I'm looking in the wrong place, or it could
>> be that someone forgot to document something, or it could be that it can in
>> fact be done by applying a general mechanism in a specific way that I
>> haven't thought of -- as in this case, where one can use a JDBC sink from
>> Python if one thinks to use DDL.
>>
>> So I think it would be helpful to be explicit about both what is, and
>> what is not, supported in PyFlink. And to have some very clear organizing
>> principles in the documentation so that users can quickly learn where to
>> look for specific facts.
>>
>> Regards,
>> David
>>
>>
>> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
>> wrote:
>>
>>> Hi Seth and David,
>>>
>>> I'm very happy to have your reply and suggestions. I would like to share
>>> my thoughts here:
>>>
>>> The main motivation we want to refactor the PyFlink doc is that we want
>>> to make sure that the Python users could find all they want starting from
>>> the PyFlink documentation mainpage. That’s, the PyFlink documentation

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread jincheng sun
Hi David, Thank you for sharing the problems with the current document, and
I agree with you as I also got the same feedback from Chinese users. I am
often contacted by users to ask questions such as whether PyFlink supports
"Java UDF" and whether PyFlink supports "xxxConnector". The root cause of
these problems is that our existing documents are based on Java users (text
and API mixed part). Since Python is newly added from 1.9, many document
information is not friendly to Python users. They don't want to look for
Python content in unfamiliar Java documents. Just yesterday, there were
complaints from Chinese users about where is all the document entries of
 Python API. So, have a centralized entry and clear document structure,
which is the urgent demand of Python users. The original intention of FLIP
is do our best to solve these user pain points.

Hi Xingbo and Wei Thank you for sharing PySpark's status on document
optimization. You're right. PySpark already has a lot of Python user
groups. They also find that Python user community is an important position
for multilingual support. The centralization and unification of Python
document content will reduce the learning cost of Python users, and good
document structure and content will also reduce the Q & A burden of the
community, It's a once and for all job.

Hi Seth, I wonder if your concerns have been resolved through the previous
discussion?

Anyway, the principle of FLIP is that in python document should only
include Python specific content, instead of making a copy of the Java
content. And would be great to have you to join in the improvement for
PyFlink (Both PRs and Review PRs).

Best,
Jincheng


Wei Zhong  于2020年8月5日周三 下午5:46写道:

> Hi Xingbo,
>
> Thanks for your information.
>
> I think the PySpark's documentation redesigning deserves our attention. It
> seems that the Spark community has also begun to treat the user experience
> of Python documentation more seriously. We can continue to pay attention to
> the discussion and progress of the redesigning in the Spark community. It
> is so similar to our working that there should be some ideas worthy for us.
>
> Best,
> Wei
>
>
> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>
> Hi,
>
> I found that the spark community is also working on redesigning pyspark
> documentation[1] recently. Maybe we can compare the difference between our
> document structure and its document structure.
>
> [1] https://issues.apache.org/jira/browse/SPARK-31851
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>
> Best,
> Xingbo
>
> David Anderson  于2020年8月5日周三 上午3:17写道:
>
>> I'm delighted to see energy going into improving the documentation.
>>
>> With the current documentation, I get a lot of questions that I believe
>> reflect two fundamental problems with what we currently provide:
>>
>> (1) We have a lot of contextual information in our heads about how Flink
>> works, and we are able to use that knowledge to make reasonable inferences
>> about how things (probably) work in cases we aren't so familiar with. For
>> example, I get a lot of questions of the form "If I use  will
>> I still have exactly once guarantees?" The answer is always yes, but they
>> continue to have doubts because we have failed to clearly communicate this
>> fundamental, underlying principle.
>>
>> This specific example about fault tolerance applies across all of the
>> Flink docs, but the general idea can also be applied to the Table/SQL and
>> PyFlink docs. The guiding principles underlying these APIs should be
>> written down in one easy-to-find place.
>>
>> (2) The other kind of question I get a lot is "Can I do  with ?"
>> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
>> very difficult to answer because it is frequently the case that one has to
>> reason about why a given feature doesn't seem to appear in the
>> documentation. It could be that I'm looking in the wrong place, or it could
>> be that someone forgot to document something, or it could be that it can in
>> fact be done by applying a general mechanism in a specific way that I
>> haven't thought of -- as in this case, where one can use a JDBC sink from
>> Python if one thinks to use DDL.
>>
>> So I think it would be helpful to be explicit about both what is, and
>> what is not, supported in PyFlink. And to have some very clear organizing
>> principles in the documentation so that users can quickly learn where to
>> look for specific facts.
>>
>> Regards,
>> David
>>
>>
>> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
>> wrote:
>>
>>> Hi Seth and David,
>>>
>>> I'm very happy to have your reply and suggestions. I would like to share
>>> my thoughts here:
>>>
>>> The main motivation we want to refactor the PyFlink doc is that we want
>>> to make sure that the Python users could find all they want starting from
>>> the PyFlink documentation mainpage. That’s, the PyFlink documentation

?????? arm??centos7??????pyflink

2020-08-05 Thread ????
Hi:
  mvn clean package 
-DskipTestflink-1.11.1-src.tgzflink-1.11.1/build-target/bin/pyflink-shell.shpyflink


----
??: 
   ""   
 <1129656...@qq.com;
:2020??8??4??(??) 10:19
??:"user-zh"

Re: Issue with single job yarn flink cluster HA

2020-08-05 Thread Ken Krugler
Hi Dinesh,

Did updating to Flink 1.10 resolve the issue?

Thanks,

— Ken

> Hi Andrey,
> Sure We will try to use Flink 1.10 to see if HA issues we are facing is fixed 
> and update in this thread.
> 
> Thanks,
> Dinesh
> 
> On Thu, Apr 2, 2020 at 3:22 PM Andrey Zagrebin  > wrote:
> Hi Dinesh,
> 
> Thanks for sharing the logs. There were couple of HA fixes since 1.7, e.g. 
> [1] and [2].
> I would suggest to try Flink 1.10.
> If the problem persists, could you also find the logs of the failed Job 
> Manager before the failover?
> 
> Best,
> Andrey
> 
> [1] https://jira.apache.org/jira/browse/FLINK-14316 
> 
> [2] https://jira.apache.org/jira/browse/FLINK-11843 
> 
> On Tue, Mar 31, 2020 at 6:49 AM Dinesh J  > wrote:
> Hi Yang,
> I am attaching one full jobmanager log for a job which I reran today. This a 
> job that tries to read from savepoint.
> Same error message "leader election onging" is displayed. And this stays the 
> same even after 30 minutes. If I leave the job without yarn kill, it stays 
> the same forever.
> Based on your suggestions till now, I guess it might be some zookeeper 
> problem. If that is the case, what can I lookout for in zookeeper to figure 
> out the issue?
> 
> Thanks,
> Dinesh


[snip]

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Flink Promethues Metricsreporter Question

2020-08-05 Thread Avijit Saha
Hi,

Have a general question about Flink support for Prometheus metrics. We
already have a Prometheus setup in our cluster with ServiceMonitor-s
monitoring ports like 8080 etc. for scraping metrics.

In a setup like this, if we deploy Flink Job managers/Task managers in the
cluster, is there any need to have the PrometheusReporter configured as
well? How does that coordinate with existing Prometheus ServiceMonitors if
present?

Is the  PrometheusReporter based on "pull" model so that it can pull
metrics from Flink and send to some Prometheus host system?

Thanks
Avijit


Re: Flink CPU load metrics in K8s

2020-08-05 Thread Bajaj, Abhinav
Thanks Roman for providing the details.

I also made more observations that has increased my confusion about this topic 
To ease the calculations, I deployed a test cluster this time providing 1 CPU 
in K8s(with docker) for all the taskmanager container.

When I check the taskmanager CPU load, the value is in the order of 
"0.002158428663932657".
Assuming that the underlying JVM recognizes 1 CPU allocated to the docker 
container, this values means % CPU usage in ball park of 0.21%.

However, if I look at the K8s metrics(formula below) for this container – it 
turns out in the ball park of 10-16%.
There is no other process running in the container apart from the flink 
taskmanager.

The order of these two values of CPU % usage is different.

Am I comparing the right metrics here?
How are folks running Flink on K8s monitoring the CPU load?

~ Abhi

% CPU usage from K8s metrics
sum(rate(container_cpu_usage_seconds_total{pod=~"my-taskmanagers-*", 
container="taskmanager"}[5m])) by (pod)
/ sum(container_spec_cpu_quota{pod=~"my-taskmanager-pod-*", 
container="taskmanager"}
/ container_spec_cpu_period{pod=~"my-taskmanager-pod-*", 
container="taskmanager"}) by (pod)

From: Roman Grebennikov 
Date: Tuesday, August 4, 2020 at 12:42 AM
To: "user@flink.apache.org" 
Subject: Re: Flink CPU load metrics in K8s

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi,

JVM.CPU.Load is just a wrapper (MetricUtils.instantiateCPUMetrics) on top of 
OperatingSystemMXBean.getProcessCpuLoad (see 
https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad())

Usually it looks weird if you have multiple CPU cores. For example, if you have 
a job with a single slot 100% utilizing a single CPU core on a 8 core machine, 
the JVM.CPU.Load will be 1.0/8.0 = 0.125. It's also a point-in-time snapshot of 
current CPU usage, so if you're collecting your metrics every minute, and the 
job has spiky workload within this minute (like it's idle almost always and 
once in a minute it consumes 100% CPU for one second), so you have a chance to 
completely miss this from the metrics.

As for me personally, JVM.CPU.Time is more clear indicator of CPU usage, which 
is always increasing amount of milliseconds CPU spent executing your code. And 
it will also catch CPU usage spikes.

Roman Grebennikov | g...@dfdx.me


On Mon, Aug 3, 2020, at 23:34, Bajaj, Abhinav wrote:

Hi,



I am trying to understand the CPU Load metrics reported by Flink 1.7.1 running 
with openjdk 1.8.0_212 on K8s.



After deploying the Flink Job on K8s, I tried to get CPU Load metrics following 
this 
documentation.

curl 
localhost:8081/taskmanagers/7737ac33b311ea0a696422680711597b/metrics?get=Status.JVM.CPU.Load,Status.JVM.CPU.Time

[{"id":"Status.JVM.CPU.Load","value":"0.0023815194093831865"},{"id":"Status.JVM.CPU.Time","value":"2326000"}]



The value of the CPU load looks odd to me.



What is the unit and scale of this value?

How does Flink determine this value?



Appreciate your time and help here.

~ Abhinav Bajaj





Re: Two Queries and a Kafka Topic

2020-08-05 Thread Marco Villalobos
Hi Theo,

Thank you.

I just read the State Processor API in an effort to understand Option 1, it 
seems though I can just use a KeyedProcessFunction that loads the data just 
once (maybe on the "open" method), and serialize the values into MapState and 
use it from that point on.

Another option in documentation are CheckpointedFunction types, which were not 
clear in the documentation to me on how to use.

My data shares a common key, so this might be doable in KeyedProcessFunction.

Is that what you're suggesting?

Again, Thank you.

Marco A, Villalobos


> On Aug 5, 2020, at 3:52 AM, Theo Diefenthal 
>  wrote:
> 
> Hi Marco,
> 
> In general, I see three solutions here you could approach: 
> 
> 1. Use the StateProcessorAPI: You can run a program with the 
> stateProcessorAPI that loads the data from JDBC and stores it into a Flink 
> SavePoint. Afterwards, you start your streaming job from that savepoint which 
> will load its state and within find all the data from JDBC stored already. 
> 2. Load from master, distribute with the job: When you build up your 
> jobgraph, you could execute the JDBC queries and put the result into some 
> Serializable class which in turn you plug in a an operator in your stream 
> (e.g. a map stage). The class along with all the queried data will be 
> serialized and deserialized on the taskmanagers (Usually, I use this for 
> configuration parameters, but it might be ok in this case as well)
> 3. Load from TaskManager: In your map-function, if the very first event is 
> received, you can block processing and synchronously load the data from JDBC 
> (So each Taskmanager performs the JDBC query itself). You then keep the data 
> to be used for all subsequent map steps. 
> 
> I think, option 3 is the easiest to be implemented while option 1 might be 
> the most elegant way in my opinion. 
> 
> Best regards
> Theo
> 
> Von: "Marco Villalobos" 
> An: "Leonard Xu" 
> CC: "user" 
> Gesendet: Mittwoch, 5. August 2020 04:33:23
> Betreff: Re: Two Queries and a Kafka Topic
> 
> Hi Leonard,
> 
> First, Thank you.
> 
> I am currently trying to restrict my solution to Apache Flink 1.10 because 
> its the current version supported by Amazon EMR.
> i am not ready to change our operational environment to solve this.
> 
> Second, I am using the DataStream API.  The Kafka Topic is not in a table, it 
> is in a DataStream.
> 
> The SQL queries are literally from a PostgresSQL database, and only need to 
> be run exactly once in the lifetime of the job.
> 
> I am struggling to determine where this happens.
> 
> JDBCInputFormat seems to query the SQL table repetitively, and also 
> connecting streams and aggregating into one object is very complicated.
> 
> Thus, I am wondering what is the right approach.  
> 
> Let me restate the parameters.
> 
> SQL Query One = data in PostgreSQL (200K records) that is used for business 
> logic.
> SQL Query Two = data in PostgreSQL (1000 records) that is used for business 
> logic.
> Kafka Topic One = unlimited data-stream that uses the data-stream api and 
> queries above to write into multiple sinks
> 
> Asci Diagram:
> 
> [SQL Query One] > [Aggregate to Map]  
> 
> 
>   Kafka 
> > [Kafka Topic One]  --- [Keyed Process Function (Query One Map, Query 
> Two Map)] ---<[Multiple Sinks] 
> 
> [SQL Query Two] >[Aggregate to Map]
> 
> 
> Maybe my graph above helps.  You see, I need Query One and Query Two only 
> ever execute once.  After that the information they provide are used to 
> correctly process the Kafka Topic.
> 
> I'll take a deep further to try and understand what you said, thank you, but 
> JDBCInputFormat seem to repetitively query the database.  Maybe I need to 
> write a RichFunction or AsyncIO function and cache the results in state after 
> that.
> 
> 
> 
> On Aug 4, 2020, at 6:25 PM, Leonard Xu  > wrote:
> 
> Hi, Marco
> 
> If I need SQL Query One and SQL Query Two to happen just one time,
> 
> Looks like you want to reuse this kafka table in one job, It’s supported to 
> execute multiple query in one sql job in Flink 1.11. 
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a 
> single SQL job[1].
> 
> 
> Best
> Leonard 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>  
> 
> 
> 
> 在 2020年8月5日,04:34,Marco Villalobos  > 写道:
> 
> Lets say that I have:
> 
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
> 
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
> 
> If I need SQL Query One and SQL Query Two to 

Re: Handle idle kafka source in Flink 1.9

2020-08-05 Thread bat man
Hello Arvid,

Thanks for the suggestion/reference and my apologies for the late reply.

With this I am able to process the data with some topics not having regular
data. Obviously, late data is being handheld as in side-output and has a
process for it.
One challenge is to handle the back-fill as when I run the job with old
data because of watermark(taking into account maxOutOfOrderness is set to
10 minutes) the older data gets filtered as late data. For handling this I
am thinking of running the side-input with maxOutOfOrderness to the oldest
data, regular job to be ok with normal setting.

Thanks,
Hemant

On Thu, Jul 30, 2020 at 2:41 PM Arvid Heise  wrote:

> Hi Hemant,
>
> sorry for the late reply.
>
> You can just create your own watermark assigner and either copy the
> assigner from Flink 1.11 or take the one that we use in our trainings [1].
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
>
> On Thu, Jul 23, 2020 at 8:48 PM bat man  wrote:
>
>> Thanks Niels for a great talk. You have covered two of my pain areas -
>> slim and broken streams. Since I am dealing with device data from on-prem
>> data centers. The first option of generating fabricated watermark events is
>> fine, however as mentioned in your talk how are you handling forwarding it
>> to the next stream(next kafka topic) after enrichment. Have you got any
>> solution for this?
>>
>> -Hemant
>>
>> On Thu, Jul 23, 2020 at 12:05 PM Niels Basjes  wrote:
>>
>>> Have a look at this presentation I gave a few weeks ago.
>>> https://youtu.be/bQmz7JOmE_4
>>>
>>> Niels Basjes
>>>
>>> On Wed, 22 Jul 2020, 08:51 bat man,  wrote:
>>>
 Hi Team,

 Can someone share their experiences handling this.

 Thanks.

 On Tue, Jul 21, 2020 at 11:30 AM bat man  wrote:

> Hello,
>
> I have a pipeline which consumes data from a Kafka source. Since, the
> partitions are partitioned by device_id in case a group of devices is down
> some partitions will not get normal flow of data.
> I understand from documentation here[1] in flink 1.11 one can declare
> the source idle -
> WatermarkStrategy.>forBoundedOutOfOrderness(
> Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>
> How can I handle this in 1.9, since I am using aws emr and emr doesn't
> have any release with the latest flink version.
>
> One way I could think of is to trigger watermark generation every 10
> minutes or so using Periodic watermarks. However, this will not be full
> proof, are there any better way to handle this more dynamically.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>
> Thanks,
> Hemant
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Status of a job when a kafka source dies

2020-08-05 Thread Piotr Nowojski
Hi Nick,

Could you elaborate more, what event and how would you like Flink to
handle? Is there some kind of Kafka's API that can be used to listen to
such kind of events? Becket, do you maybe know something about this?

As a side note Nick, can not you configure some timeouts [1] in the
KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But as I
wrote before, that would be more a question to Kafka guys.

Piotrek

[1] http://kafka.apache.org/20/documentation/

śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):

> +user group.
>
> On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:
>
>> Thanks Piotr but shouldn't this event be handled by the
>> FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
>> How can I catch this event in my code since I don't have control over the
>> poll.
>>
>> Best,
>> Nick.
>>
>> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Nick,
>>>
>>> What Aljoscha was trying to say is that Flink is not trying to do any
>>> magic. If `KafkaConsumer` - which is being used under the hood of
>>> `FlinkKafkaConsumer` connector - throws an exception, this
>>> exception bubbles up causing the job to failover. If the failure is handled
>>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>>> TM log that you attached, the latter seems to be happening - note that the
>>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>>> that's not the code we (Flink developers) control.
>>>
>>> If you want to change this behaviour, unless someone here on this
>>> mailing list just happens to know the answer, the better place to ask such
>>> a question on the Kafka mailing list. Maybe there is some way to configure
>>> this.
>>>
>>> And sorry I don't know much about neither the KafkaConsumer nor the
>>> KafkaBrokers configuration :(
>>>
>>> Piotrek
>>>
>>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>>
 Hi,
 I don't observe this behaviour though, we use flink 1.7.2 . I stopped
 kafka and zookeeper on all broker nodes. On the flink side, I see the
 messages in the log ( data is obfuscated) . There are no error logs. The
 kafka consumer properties are

 1. "bootstrap.servers"

 2. "zookeeper.connect

 3. "auto.offset.reset"

 4. "group.id"

 5."security.protocol"


 The flink consumer starts consuming data as soon as the kafka comes
 back up. So I want to know in what scenario/kafka consumer config will the
 job go to failed state after a finite number of restart attempts from
 checkpoint.


 TM log.
 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-5,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
 yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-4,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
 yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-4,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
 yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-6,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
 yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
 Broker may not be available.

 Best,
 Nick

 On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
 wrote:

> Hi,
>
> Flink doesn't do any special failure-handling or retry logic, so it’s
> up
> to how the KafkaConsumer is configured via properties. In general
> Flink
> doesn’t try to be smart: when something fails an exception fill bubble
> up that will fail this execution of the job. If checkpoints are
> enabled
> this will trigger a restore, this is controlled by the restart
> strategy.
> If that eventually gives up the job fill go to “FAILED” and stop.
>
> This is the relevant section of the docs:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>
> Best,
> Aljoscha
>
> On 15.07.20 17:42, Nick Bendtner wrote:
> > Hi guys,
> > I want to know what is the default behavior of Kafka source when a
> kafka
> > cluster goes down during streaming. Will the job status go to
> failing or is
> > the exception caught and there is a back off before the source tries
> to

Re: Status of a job when a kafka source dies

2020-08-05 Thread Nick Bendtner
+user group.

On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:

> Thanks Piotr but shouldn't this event be handled by the FlinkKafkaConsumer
> since the poll happens inside the FlinkKafkaConsumer. How can I catch this
> event in my code since I don't have control over the poll.
>
> Best,
> Nick.
>
> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
> wrote:
>
>> Hi Nick,
>>
>> What Aljoscha was trying to say is that Flink is not trying to do any
>> magic. If `KafkaConsumer` - which is being used under the hood of
>> `FlinkKafkaConsumer` connector - throws an exception, this
>> exception bubbles up causing the job to failover. If the failure is handled
>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>> TM log that you attached, the latter seems to be happening - note that the
>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>> that's not the code we (Flink developers) control.
>>
>> If you want to change this behaviour, unless someone here on this mailing
>> list just happens to know the answer, the better place to ask such a
>> question on the Kafka mailing list. Maybe there is some way to configure
>> this.
>>
>> And sorry I don't know much about neither the KafkaConsumer nor the
>> KafkaBrokers configuration :(
>>
>> Piotrek
>>
>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>
>>> Hi,
>>> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
>>> kafka and zookeeper on all broker nodes. On the flink side, I see the
>>> messages in the log ( data is obfuscated) . There are no error logs. The
>>> kafka consumer properties are
>>>
>>> 1. "bootstrap.servers"
>>>
>>> 2. "zookeeper.connect
>>>
>>> 3. "auto.offset.reset"
>>>
>>> 4. "group.id"
>>>
>>> 5."security.protocol"
>>>
>>>
>>> The flink consumer starts consuming data as soon as the kafka comes back
>>> up. So I want to know in what scenario/kafka consumer config will the job
>>> go to failed state after a finite number of restart attempts from
>>> checkpoint.
>>>
>>>
>>> TM log.
>>> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-5,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
>>> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
>>> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-6,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>>
>>> Best,
>>> Nick
>>>
>>> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
>>> wrote:
>>>
 Hi,

 Flink doesn't do any special failure-handling or retry logic, so it’s
 up
 to how the KafkaConsumer is configured via properties. In general Flink
 doesn’t try to be smart: when something fails an exception fill bubble
 up that will fail this execution of the job. If checkpoints are enabled
 this will trigger a restore, this is controlled by the restart
 strategy.
 If that eventually gives up the job fill go to “FAILED” and stop.

 This is the relevant section of the docs:

 https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html

 Best,
 Aljoscha

 On 15.07.20 17:42, Nick Bendtner wrote:
 > Hi guys,
 > I want to know what is the default behavior of Kafka source when a
 kafka
 > cluster goes down during streaming. Will the job status go to failing
 or is
 > the exception caught and there is a back off before the source tries
 to
 > poll for more events ?
 >
 >
 > Best,
 > Nick.
 >




Re: Status of a job when a kafka source dies

2020-08-05 Thread Piotr Nowojski
Hi Nick,

What Aljoscha was trying to say is that Flink is not trying to do any
magic. If `KafkaConsumer` - which is being used under the hood of
`FlinkKafkaConsumer` connector - throws an exception, this
exception bubbles up causing the job to failover. If the failure is handled
by the `KafkaConsumer` silently, that's what's happening. As we can in the
TM log that you attached, the latter seems to be happening - note that the
warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
that's not the code we (Flink developers) control.

If you want to change this behaviour, unless someone here on this mailing
list just happens to know the answer, the better place to ask such a
question on the Kafka mailing list. Maybe there is some way to configure
this.

And sorry I don't know much about neither the KafkaConsumer nor the
KafkaBrokers configuration :(

Piotrek

wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):

> Hi,
> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
> kafka and zookeeper on all broker nodes. On the flink side, I see the
> messages in the log ( data is obfuscated) . There are no error logs. The
> kafka consumer properties are
>
> 1. "bootstrap.servers"
>
> 2. "zookeeper.connect
>
> 3. "auto.offset.reset"
>
> 4. "group.id"
>
> 5."security.protocol"
>
>
> The flink consumer starts consuming data as soon as the kafka comes back
> up. So I want to know in what scenario/kafka consumer config will the job
> go to failed state after a finite number of restart attempts from
> checkpoint.
>
>
> TM log.
> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-5,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-6,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
>
> Best,
> Nick
>
> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Flink doesn't do any special failure-handling or retry logic, so it’s up
>> to how the KafkaConsumer is configured via properties. In general Flink
>> doesn’t try to be smart: when something fails an exception fill bubble
>> up that will fail this execution of the job. If checkpoints are enabled
>> this will trigger a restore, this is controlled by the restart strategy.
>> If that eventually gives up the job fill go to “FAILED” and stop.
>>
>> This is the relevant section of the docs:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>>
>> Best,
>> Aljoscha
>>
>> On 15.07.20 17:42, Nick Bendtner wrote:
>> > Hi guys,
>> > I want to know what is the default behavior of Kafka source when a kafka
>> > cluster goes down during streaming. Will the job status go to failing
>> or is
>> > the exception caught and there is a back off before the source tries to
>> > poll for more events ?
>> >
>> >
>> > Best,
>> > Nick.
>> >
>>
>>


Re: Kafka transaction error lead to data loss under end to end exact-once

2020-08-05 Thread Piotr Nowojski
Hi Lu,

In this case, as it looks from the quite fragmented log/error message that
you posted, the job has failed so Flink indeed detected some issue and that
probably means a data loss in Kafka (in such case you could probably
recover some lost records by reading with `read_uncommitted` mode from
Kafka, but that can leads to data duplication).

However a very similar error can be logged by Flink as WARN during
recovery. In that case it can mean either:
- data loss because of timeouts (keep in mind that kafka transactional
timeouts must cover: checkpoint interval + downtime during the failure +
time to restart and recover Flink job)
- transaction was already committed before, just before failure has happened

and there is unfortunately no way using Kafka API to distinguish those two
cases.

Piotrek


śr., 5 sie 2020 o 10:17 Khachatryan Roman 
napisał(a):

> Hi Lu,
>
> AFAIK, it's not going to be fixed. As you mentioned in the first email,
> Kafka should be configured so that it's transaction timeout is less than
> your max checkpoint duration.
>
> However, you should not only change transaction.timeout.ms in producer
> but also transaction.max.timeout.ms on your brokers.
> Please refer to
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats
>
> Regards,
> Roman
>
>
> On Wed, Aug 5, 2020 at 12:24 AM Lu Niu  wrote:
>
>> Hi, Khachatryan
>>
>> Thank you for the reply. Is that a problem that can be fixed? If so, is
>> the fix on roadmap? Thanks!
>>
>> Best
>> Lu
>>
>> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Lu,
>>>
>>> Yes, this error indicates data loss (unless there were no records in the
>>> transactions).
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu  wrote:
>>>
 Hi,

 We are using end to end exact-once flink + kafka and
 encountered belowing exception which usually came after checkpoint 
 failures:
 ```














 *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
 Producer attempted an operation with an old epoch. Either there is a newer
 producer with the same transactionalId, or the producer's transaction has
 been expired by the broker.2020-07-28 16:27:51,633 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job xxx
 (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
 FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
 org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
 java.util.concurrent.FutureTask.run(FutureTask.java:266) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)Caused by:
 org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
 failed, logging first encountered failure at
 org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
 at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
 more*
 ```
 We did some end to end tests and noticed whenever such a thing happens,
 there will be a data loss.

 Referring to several related questions, I understand I need to increase
 `transaction.timeout.ms`  because:
 ```
 *Semantic.EXACTLY_ONCE mode relies on the ability to commit
 transactions that were started before taking a checkpoint, after recovering
 from the said checkpoint. If the time between Flink application crash and
 completed restart is larger than Kafka’s transaction timeout there will be
 data loss (Kafka will automatically abort transactions that exceeded
 timeout time).*
 ```

 But I want to confirm with the community that:
 *Does an exception like this will always lead to data loss? *

 I asked because we get this exception sometimes even when the
 checkpoint succeeds.

 Setup:
 Flink 1.9.1

 Best
 Lu

>>>


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Eleanore Jin
Hi Yang and Till,

Thanks a lot for the help! I have the similar question as Till mentioned,
if we do not fail Flink pods when the restart strategy is exhausted, it
might be hard to monitor such failures. Today I get alerts if the k8s pods
are restarted or in crash loop, but if this will no longer be the case, how
can we deal with the monitoring? In production, I have hundreds of small
flink jobs running (2-8 TM pods) doing stateless processing, it is really
hard for us to expose ingress for each JM rest endpoint to periodically
query the job status for each flink job.

Thanks a lot!
Eleanore

On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann  wrote:

> You are right Yang Wang.
>
> Thanks for creating this issue.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>
>> Actually, the application status shows in YARN web UI is not determined
>> by the jobmanager process exit code.
>> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
>> control the final status of YARN application.
>> So although jobmanager exit with zero code, it still could show failed
>> status in YARN web UI.
>>
>> I have created a ticket to track this improvement[1].
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>>
>>> Yes for the other deployments it is not a problem. A reason why people
>>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>>> to monitor than having to take a look at the actual job result. Moreover,
>>> in the YARN web UI the application shows as failed if I am not mistaken.
>>> However, from a framework's perspective, a FAILED job does not mean that
>>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 Yes, I suggest to use Job to replace Deployment. It could be used
 to run jobmanager one time and finish after a successful/failed completion.

 However, using Job still could not solve your problem completely. Just
 as Till said, When a job exhausts the restart strategy, the jobmanager
 pod will terminate with non-zero exit code. It will cause the K8s
 restarting it again. Even though we could set the resartPolicy and
 backoffLimit,
 this is not a clean and correct way to go. We should terminate the
 jobmanager process with zero exit code in such situation.

 @Till Rohrmann  I just have one concern. Is it a
 special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
 terminating with
 non-zero exit code is harmless.


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I
> put my app.jar and its dependencies under flink's lib directory. I have 1
> k8s deployment for job manager, and 1 k8s deployment for task manager, and
> 1 k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will
> cause the job manager pod to be restarted. Which is not the ideal
> behavior.
>
> Do you suggest that I should change the deployment strategy from using
> k8s deployment to k8s job? In case the flink program exit with non-zero
> code (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang 
> wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be 
>> cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink
>> the exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should
>>> rethink the exit codes of Flink. In general you want K8s to restart a
>>> failed Flink process. Hence, an application which terminates in state
>>> FAILED should not return a non-zero exit code because it is a valid
>>> termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>>> wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to 

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Eleanore Jin
Hi Yang and Till,

Thanks a lot for the help! I have the similar question as Till mentioned,
if we do not fail Flink pods when the restart strategy is exhausted, it
might be hard to monitor such failures. Today I get alerts if the k8s pods
are restarted or in crash loop, but if this will no longer be the case, how
can we deal with the monitoring? In production, I have hundreds of small
flink jobs running (2-8 TM pods) doing stateless processing, it is really
hard for us to expose ingress for each JM rest endpoint to periodically
query the job status for each flink job.

Thanks a lot!
Eleanore

On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann  wrote:

> You are right Yang Wang.
>
> Thanks for creating this issue.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>
>> Actually, the application status shows in YARN web UI is not determined
>> by the jobmanager process exit code.
>> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
>> control the final status of YARN application.
>> So although jobmanager exit with zero code, it still could show failed
>> status in YARN web UI.
>>
>> I have created a ticket to track this improvement[1].
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>>
>>> Yes for the other deployments it is not a problem. A reason why people
>>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>>> to monitor than having to take a look at the actual job result. Moreover,
>>> in the YARN web UI the application shows as failed if I am not mistaken.
>>> However, from a framework's perspective, a FAILED job does not mean that
>>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 Yes, I suggest to use Job to replace Deployment. It could be used
 to run jobmanager one time and finish after a successful/failed completion.

 However, using Job still could not solve your problem completely. Just
 as Till said, When a job exhausts the restart strategy, the jobmanager
 pod will terminate with non-zero exit code. It will cause the K8s
 restarting it again. Even though we could set the resartPolicy and
 backoffLimit,
 this is not a clean and correct way to go. We should terminate the
 jobmanager process with zero exit code in such situation.

 @Till Rohrmann  I just have one concern. Is it a
 special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
 terminating with
 non-zero exit code is harmless.


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I
> put my app.jar and its dependencies under flink's lib directory. I have 1
> k8s deployment for job manager, and 1 k8s deployment for task manager, and
> 1 k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will
> cause the job manager pod to be restarted. Which is not the ideal
> behavior.
>
> Do you suggest that I should change the deployment strategy from using
> k8s deployment to k8s job? In case the flink program exit with non-zero
> code (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang 
> wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be 
>> cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink
>> the exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should
>>> rethink the exit codes of Flink. In general you want K8s to restart a
>>> failed Flink process. Hence, an application which terminates in state
>>> FAILED should not return a non-zero exit code because it is a valid
>>> termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>>> wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to 

Re: UDF:Type is not supported: ANY

2020-08-05 Thread Benchao Li
Hi zilong,

SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
其他类型的type information会被当做any类型来处理。
这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
支持List作为ARRAY的数据,应该要在1.12才能支持[1]。

[1] https://issues.apache.org/jira/browse/FLINK-18417

zilong xiao  于2020年8月3日周一 下午8:23写道:

> 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
>
> godfrey he  于2020年8月3日周一 下午7:50写道:
>
> > 你把Map换为Map试试
> >
> > zilong xiao  于2020年8月3日周一 下午4:56写道:
> >
> > > 目前转List可以用数组代替,Map貌似没法成功运行
> > >
> > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > >
> > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type
> is
> > > not
> > > > supported:
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > Json2Map
> > > > udf应该怎么操作呢?求前辈指导
> > > >
> > > > udfd代码如下:
> > > >
> > > > public class Json2List extends ScalarFunction {
> > > >
> > > >private static final Logger LOG =
> > > LoggerFactory.getLogger(Json2List.class);
> > > >
> > > >private static final ObjectMapper OBJECT_MAPPER = new
> ObjectMapper()
> > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > true) ;
> > > >
> > > >public Json2List(){}
> > > >
> > > >public List eval(String param) {
> > > >   List result = new ArrayList<>();
> > > >   try {
> > > >  List> list =
> > OBJECT_MAPPER.readValue(param,
> > > List.class);
> > > >  for(Map map : list){
> > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > >  }
> > > >  return result;
> > > >   } catch (JsonProcessingException e){
> > > >  LOG.error("failed to convert json to array, param is: {}",
> > > param, e);
> > > >   }
> > > >   return result;
> > > >}
> > > >
> > > >
> > > >@Override
> > > >public TypeInformation> getResultType(Class[]
> > > signature) {
> > > >   return Types.LIST(Types.STRING);
> > > >}
> > > >
> > > > }
> > > >
> > > >
> > >
> >
>


-- 

Best,
Benchao Li


Re: Flink Mysql sink按时间分库分表

2020-08-05 Thread Leonard Xu
Hi

我理解这个除了指定表名,关键是要在数据库中自动建表吧,JDBC 这边之前有个相关issue我跟进过[2],不过代码还没进,暂时还没有好的办法。Es 
connector 是支持类似功能的,如果数据可以放在es可以使用下。

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-16294 



> 在 2020年8月5日,20:36,张健  写道:
> 
> 
> 
> 
> 大家好:
> 
> 
> 想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛?
> 
> 
> 我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink
>  JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛?
> 
> 
> 多谢。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> 张健



Re:Re: Re: FLINK SQL view的数据复用问题

2020-08-05 Thread kandy.wang






@ godfrey
你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。











在 2020-08-04 19:36:56,"godfrey he"  写道:
>调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
>
>kandy.wang  于2020年8月4日周二 下午6:21写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @ godfrey
>> thanks。刚试了一下,source -> Deduplicate  ->
>> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
>> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>>
>>
>> 在 2020-08-04 17:26:02,"godfrey he"  写道:
>> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
>> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>> >
>> >kandy.wang  于2020年8月4日周二 下午5:20写道:
>> >
>> >> FLINK SQL view相关问题:
>> >> create view order_source
>> >>
>> >> as
>> >>
>> >> select order_id, order_goods_id, user_id,...
>> >>
>> >> from (
>> >>
>> >> ..  proctime,row_number() over(partition by order_id,
>> >> order_goods_id order by proctime desc) as rownum
>> >>
>> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
>> properties.group.id'='flink_etl_kafka_hbase',
>> >> 'scan.startup.mode'='latest-offset') */
>> >>
>> >> ) where  rownum = 1 and  price > 0;
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT),)
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_date as rowkey,
>> >>
>> >> sum(amount) as saleN,
>> >>
>> >> from order_source
>> >>
>> >> group by order_date
>> >>
>> >> );
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT))
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_hour as rowkey,sum(amount) as saleN,
>> >>
>> >>
>> >>
>> >> from order_source
>> >>
>> >> group by order_hour
>> >>
>> >> );
>> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
>> >> 2
>> >>
>> >>
>> >> 本意是想通过view  order_source
>> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>> >>
>> >>
>>


Flink Mysql sink按时间分库分表

2020-08-05 Thread 张健



大家好:


想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛?


我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink
 JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛?


多谢。










--

张健

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Till Rohrmann
You are right Yang Wang.

Thanks for creating this issue.

Cheers,
Till

On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:

> Actually, the application status shows in YARN web UI is not determined by
> the jobmanager process exit code.
> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
> control the final status of YARN application.
> So although jobmanager exit with zero code, it still could show failed
> status in YARN web UI.
>
> I have created a ticket to track this improvement[1].
>
> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>
>> Yes for the other deployments it is not a problem. A reason why people
>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>> to monitor than having to take a look at the actual job result. Moreover,
>> in the YARN web UI the application shows as failed if I am not mistaken.
>> However, from a framework's perspective, a FAILED job does not mean that
>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>
>>> Hi Eleanore,
>>>
>>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>>> jobmanager one time and finish after a successful/failed completion.
>>>
>>> However, using Job still could not solve your problem completely. Just
>>> as Till said, When a job exhausts the restart strategy, the jobmanager
>>> pod will terminate with non-zero exit code. It will cause the K8s
>>> restarting it again. Even though we could set the resartPolicy and
>>> backoffLimit,
>>> this is not a clean and correct way to go. We should terminate the
>>> jobmanager process with zero exit code in such situation.
>>>
>>> @Till Rohrmann  I just have one concern. Is it a
>>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>>> terminating with
>>> non-zero exit code is harmless.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>>
 Hi Yang & Till,

 Thanks for your prompt reply!

 Yang, regarding your question, I am actually not using k8s job, as I
 put my app.jar and its dependencies under flink's lib directory. I have 1
 k8s deployment for job manager, and 1 k8s deployment for task manager, and
 1 k8s service for job manager.

 As you mentioned above, if flink job is marked as failed, it will cause
 the job manager pod to be restarted. Which is not the ideal behavior.

 Do you suggest that I should change the deployment strategy from using
 k8s deployment to k8s job? In case the flink program exit with non-zero
 code (e.g. exhausted number of configured restart), pod can be marked as
 complete hence not restarting the job again?

 Thanks a lot!
 Eleanore

 On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:

> @Till Rohrmann  In native mode, when a Flink
> application terminates with FAILED state, all the resources will be 
> cleaned
> up.
>
> However, in standalone mode, I agree with you that we need to rethink
> the exit code of Flink. When a job exhausts the restart
> strategy, we should terminate the pod and do not restart again. After
> googling, it seems that we could not specify the restartPolicy
> based on exit code[1]. So maybe we need to return a zero exit code to
> avoid restarting by K8s.
>
> [1].
> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>
> Best,
> Yang
>
> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>
>> @Yang Wang  I believe that we should
>> rethink the exit codes of Flink. In general you want K8s to restart a
>> failed Flink process. Hence, an application which terminates in state
>> FAILED should not return a non-zero exit code because it is a valid
>> termination state.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> I think you are using K8s resource "Job" to deploy the jobmanager.
>>> Please set .spec.template.spec.restartPolicy = "Never" and
>>> spec.backoffLimit = 0.
>>> Refer here[1] for more information.
>>>
>>> Then, when the jobmanager failed because of any reason, the K8s job
>>> will be marked failed. And K8s will not restart the job again.
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>>
 Hi Till,

 Thanks for the reply!

 I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
 Specifically, I build a custom docker image, which I copied the app jar
 (not uber jar) and 

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Till Rohrmann
You are right Yang Wang.

Thanks for creating this issue.

Cheers,
Till

On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:

> Actually, the application status shows in YARN web UI is not determined by
> the jobmanager process exit code.
> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
> control the final status of YARN application.
> So although jobmanager exit with zero code, it still could show failed
> status in YARN web UI.
>
> I have created a ticket to track this improvement[1].
>
> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>
>> Yes for the other deployments it is not a problem. A reason why people
>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>> to monitor than having to take a look at the actual job result. Moreover,
>> in the YARN web UI the application shows as failed if I am not mistaken.
>> However, from a framework's perspective, a FAILED job does not mean that
>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>
>>> Hi Eleanore,
>>>
>>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>>> jobmanager one time and finish after a successful/failed completion.
>>>
>>> However, using Job still could not solve your problem completely. Just
>>> as Till said, When a job exhausts the restart strategy, the jobmanager
>>> pod will terminate with non-zero exit code. It will cause the K8s
>>> restarting it again. Even though we could set the resartPolicy and
>>> backoffLimit,
>>> this is not a clean and correct way to go. We should terminate the
>>> jobmanager process with zero exit code in such situation.
>>>
>>> @Till Rohrmann  I just have one concern. Is it a
>>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>>> terminating with
>>> non-zero exit code is harmless.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>>
 Hi Yang & Till,

 Thanks for your prompt reply!

 Yang, regarding your question, I am actually not using k8s job, as I
 put my app.jar and its dependencies under flink's lib directory. I have 1
 k8s deployment for job manager, and 1 k8s deployment for task manager, and
 1 k8s service for job manager.

 As you mentioned above, if flink job is marked as failed, it will cause
 the job manager pod to be restarted. Which is not the ideal behavior.

 Do you suggest that I should change the deployment strategy from using
 k8s deployment to k8s job? In case the flink program exit with non-zero
 code (e.g. exhausted number of configured restart), pod can be marked as
 complete hence not restarting the job again?

 Thanks a lot!
 Eleanore

 On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:

> @Till Rohrmann  In native mode, when a Flink
> application terminates with FAILED state, all the resources will be 
> cleaned
> up.
>
> However, in standalone mode, I agree with you that we need to rethink
> the exit code of Flink. When a job exhausts the restart
> strategy, we should terminate the pod and do not restart again. After
> googling, it seems that we could not specify the restartPolicy
> based on exit code[1]. So maybe we need to return a zero exit code to
> avoid restarting by K8s.
>
> [1].
> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>
> Best,
> Yang
>
> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>
>> @Yang Wang  I believe that we should
>> rethink the exit codes of Flink. In general you want K8s to restart a
>> failed Flink process. Hence, an application which terminates in state
>> FAILED should not return a non-zero exit code because it is a valid
>> termination state.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> I think you are using K8s resource "Job" to deploy the jobmanager.
>>> Please set .spec.template.spec.restartPolicy = "Never" and
>>> spec.backoffLimit = 0.
>>> Refer here[1] for more information.
>>>
>>> Then, when the jobmanager failed because of any reason, the K8s job
>>> will be marked failed. And K8s will not restart the job again.
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>>
 Hi Till,

 Thanks for the reply!

 I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
 Specifically, I build a custom docker image, which I copied the app jar
 (not uber jar) and 

Re: flink sql eos

2020-08-05 Thread Leonard Xu
Hi

> 目前仅有kafka实现了TwoPhaseCommitSinkFunction,但kafka的ddl中也没有属性去设
> 置Semantic为EXACTLY_ONCE

除了Kafka还有filesystem connector也是支持 EXACTLY ONCE的,kafka 的已经在1.12支持了[1]


> 当开启全局EXACTLY_ONCE并且所有使用的connector都支持EXACTLY_ONCE,是否整个应
> 用程序就可以做到端到端的精确一致性

是的。 

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-15221 


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Yang Wang
Actually, the application status shows in YARN web UI is not determined by
the jobmanager process exit code.
Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
control the final status of YARN application.
So although jobmanager exit with zero code, it still could show failed
status in YARN web UI.

I have created a ticket to track this improvement[1].

[1]. https://issues.apache.org/jira/browse/FLINK-18828


Best,
Yang


Till Rohrmann  于2020年8月5日周三 下午3:56写道:

> Yes for the other deployments it is not a problem. A reason why people
> preferred non-zero exit codes in case of FAILED jobs is that this is easier
> to monitor than having to take a look at the actual job result. Moreover,
> in the YARN web UI the application shows as failed if I am not mistaken.
> However, from a framework's perspective, a FAILED job does not mean that
> Flink has failed and, hence, the return code could still be 0 in my opinion.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>
>> Hi Eleanore,
>>
>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>> jobmanager one time and finish after a successful/failed completion.
>>
>> However, using Job still could not solve your problem completely. Just as
>> Till said, When a job exhausts the restart strategy, the jobmanager
>> pod will terminate with non-zero exit code. It will cause the K8s
>> restarting it again. Even though we could set the resartPolicy and
>> backoffLimit,
>> this is not a clean and correct way to go. We should terminate the
>> jobmanager process with zero exit code in such situation.
>>
>> @Till Rohrmann  I just have one concern. Is it a
>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>> terminating with
>> non-zero exit code is harmless.
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>
>>> Hi Yang & Till,
>>>
>>> Thanks for your prompt reply!
>>>
>>> Yang, regarding your question, I am actually not using k8s job, as I put
>>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>>> k8s service for job manager.
>>>
>>> As you mentioned above, if flink job is marked as failed, it will cause
>>> the job manager pod to be restarted. Which is not the ideal behavior.
>>>
>>> Do you suggest that I should change the deployment strategy from using
>>> k8s deployment to k8s job? In case the flink program exit with non-zero
>>> code (e.g. exhausted number of configured restart), pod can be marked as
>>> complete hence not restarting the job again?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>>
 @Till Rohrmann  In native mode, when a Flink
 application terminates with FAILED state, all the resources will be cleaned
 up.

 However, in standalone mode, I agree with you that we need to rethink
 the exit code of Flink. When a job exhausts the restart
 strategy, we should terminate the pod and do not restart again. After
 googling, it seems that we could not specify the restartPolicy
 based on exit code[1]. So maybe we need to return a zero exit code to
 avoid restarting by K8s.

 [1].
 https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code

 Best,
 Yang

 Till Rohrmann  于2020年8月4日周二 下午3:48写道:

> @Yang Wang  I believe that we should
> rethink the exit codes of Flink. In general you want K8s to restart a
> failed Flink process. Hence, an application which terminates in state
> FAILED should not return a non-zero exit code because it is a valid
> termination state.
>
> Cheers,
> Till
>
> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
> wrote:
>
>> Hi Eleanore,
>>
>> I think you are using K8s resource "Job" to deploy the jobmanager.
>> Please set .spec.template.spec.restartPolicy = "Never" and
>> spec.backoffLimit = 0.
>> Refer here[1] for more information.
>>
>> Then, when the jobmanager failed because of any reason, the K8s job
>> will be marked failed. And K8s will not restart the job again.
>>
>> [1].
>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>
>>> Hi Till,
>>>
>>> Thanks for the reply!
>>>
>>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>>> Specifically, I build a custom docker image, which I copied the app jar
>>> (not uber jar) and all its dependencies under /flink/lib.
>>>
>>> So my question is more like, in this case, if the job is marked as
>>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>>> what are the suggestions for such scenario?
>>>
>>> 

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Yang Wang
Actually, the application status shows in YARN web UI is not determined by
the jobmanager process exit code.
Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
control the final status of YARN application.
So although jobmanager exit with zero code, it still could show failed
status in YARN web UI.

I have created a ticket to track this improvement[1].

[1]. https://issues.apache.org/jira/browse/FLINK-18828


Best,
Yang


Till Rohrmann  于2020年8月5日周三 下午3:56写道:

> Yes for the other deployments it is not a problem. A reason why people
> preferred non-zero exit codes in case of FAILED jobs is that this is easier
> to monitor than having to take a look at the actual job result. Moreover,
> in the YARN web UI the application shows as failed if I am not mistaken.
> However, from a framework's perspective, a FAILED job does not mean that
> Flink has failed and, hence, the return code could still be 0 in my opinion.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>
>> Hi Eleanore,
>>
>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>> jobmanager one time and finish after a successful/failed completion.
>>
>> However, using Job still could not solve your problem completely. Just as
>> Till said, When a job exhausts the restart strategy, the jobmanager
>> pod will terminate with non-zero exit code. It will cause the K8s
>> restarting it again. Even though we could set the resartPolicy and
>> backoffLimit,
>> this is not a clean and correct way to go. We should terminate the
>> jobmanager process with zero exit code in such situation.
>>
>> @Till Rohrmann  I just have one concern. Is it a
>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>> terminating with
>> non-zero exit code is harmless.
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>
>>> Hi Yang & Till,
>>>
>>> Thanks for your prompt reply!
>>>
>>> Yang, regarding your question, I am actually not using k8s job, as I put
>>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>>> k8s service for job manager.
>>>
>>> As you mentioned above, if flink job is marked as failed, it will cause
>>> the job manager pod to be restarted. Which is not the ideal behavior.
>>>
>>> Do you suggest that I should change the deployment strategy from using
>>> k8s deployment to k8s job? In case the flink program exit with non-zero
>>> code (e.g. exhausted number of configured restart), pod can be marked as
>>> complete hence not restarting the job again?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>>
 @Till Rohrmann  In native mode, when a Flink
 application terminates with FAILED state, all the resources will be cleaned
 up.

 However, in standalone mode, I agree with you that we need to rethink
 the exit code of Flink. When a job exhausts the restart
 strategy, we should terminate the pod and do not restart again. After
 googling, it seems that we could not specify the restartPolicy
 based on exit code[1]. So maybe we need to return a zero exit code to
 avoid restarting by K8s.

 [1].
 https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code

 Best,
 Yang

 Till Rohrmann  于2020年8月4日周二 下午3:48写道:

> @Yang Wang  I believe that we should
> rethink the exit codes of Flink. In general you want K8s to restart a
> failed Flink process. Hence, an application which terminates in state
> FAILED should not return a non-zero exit code because it is a valid
> termination state.
>
> Cheers,
> Till
>
> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
> wrote:
>
>> Hi Eleanore,
>>
>> I think you are using K8s resource "Job" to deploy the jobmanager.
>> Please set .spec.template.spec.restartPolicy = "Never" and
>> spec.backoffLimit = 0.
>> Refer here[1] for more information.
>>
>> Then, when the jobmanager failed because of any reason, the K8s job
>> will be marked failed. And K8s will not restart the job again.
>>
>> [1].
>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>
>>> Hi Till,
>>>
>>> Thanks for the reply!
>>>
>>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>>> Specifically, I build a custom docker image, which I copied the app jar
>>> (not uber jar) and all its dependencies under /flink/lib.
>>>
>>> So my question is more like, in this case, if the job is marked as
>>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>>> what are the suggestions for such scenario?
>>>
>>> 

Re: Two Queries and a Kafka Topic

2020-08-05 Thread Theo Diefenthal
Hi Marco, 

In general, I see three solutions here you could approach: 

1. Use the StateProcessorAPI: You can run a program with the stateProcessorAPI 
that loads the data from JDBC and stores it into a Flink SavePoint. Afterwards, 
you start your streaming job from that savepoint which will load its state and 
within find all the data from JDBC stored already. 
2. Load from master, distribute with the job: When you build up your jobgraph, 
you could execute the JDBC queries and put the result into some Serializable 
class which in turn you plug in a an operator in your stream (e.g. a map 
stage). The class along with all the queried data will be serialized and 
deserialized on the taskmanagers (Usually, I use this for configuration 
parameters, but it might be ok in this case as well) 
3. Load from TaskManager: In your map-function, if the very first event is 
received, you can block processing and synchronously load the data from JDBC 
(So each Taskmanager performs the JDBC query itself). You then keep the data to 
be used for all subsequent map steps. 

I think, option 3 is the easiest to be implemented while option 1 might be the 
most elegant way in my opinion. 

Best regards 
Theo 


Von: "Marco Villalobos"  
An: "Leonard Xu"  
CC: "user"  
Gesendet: Mittwoch, 5. August 2020 04:33:23 
Betreff: Re: Two Queries and a Kafka Topic 

Hi Leonard, 

First, Thank you. 

I am currently trying to restrict my solution to Apache Flink 1.10 because its 
the current version supported by Amazon EMR. 
i am not ready to change our operational environment to solve this. 

Second, I am using the DataStream API. The Kafka Topic is not in a table, it is 
in a DataStream. 

The SQL queries are literally from a PostgresSQL database, and only need to be 
run exactly once in the lifetime of the job. 

I am struggling to determine where this happens. 

JDBCInputFormat seems to query the SQL table repetitively, and also connecting 
streams and aggregating into one object is very complicated. 

Thus, I am wondering what is the right approach. 

Let me restate the parameters. 

SQL Query One = data in PostgreSQL (200K records) that is used for business 
logic. 
SQL Query Two = data in PostgreSQL (1000 records) that is used for business 
logic. 
Kafka Topic One = unlimited data-stream that uses the data-stream api and 
queries above to write into multiple sinks 

Asci Diagram: 

[ SQL Query One] > [Aggregate to Map] 

Kafka > [Kafka Topic One] --- [Keyed Process Function (Query One Map, Query 
Two Map)] ---<[Multiple Sinks] 

[ SQL Query Two] > [Aggregate to Map] 


Maybe my graph above helps. You see, I need Query One and Query Two only ever 
execute once. After that the information they provide are used to correctly 
process the Kafka Topic. 

I'll take a deep further to try and understand what you said, thank you, but 
JDBCInputFormat seem to repetitively query the database. Maybe I need to write 
a RichFunction or AsyncIO function and cache the results in state after that. 






On Aug 4, 2020, at 6:25 PM, Leonard Xu < [ mailto:xbjt...@gmail.com | 
xbjt...@gmail.com ] > wrote: 

Hi, Marco 


BQ_BEGIN

If I need SQL Query One and SQL Query Two to happen just one time, 



Looks like you want to reuse this kafka table in one job, It’s supported to 
execute multiple query in one sql job in Flink 1.11. 
You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a 
single SQL job[1]. 


Best 
Leonard 
[1] [ 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
 | 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
 ] 



BQ_BEGIN

在 2020年8月5日,04:34,Marco Villalobos < [ mailto:mvillalo...@kineteque.com | 
mvillalo...@kineteque.com ] > 写道: 

Lets say that I have: 

SQL Query One from data in PostgreSQL (200K records). 
SQL Query Two from data in PostgreSQL (1000 records). 
and Kafka Topic One. 

Let's also say that main data from this Flink job arrives in Kafka Topic One. 

If I need SQL Query One and SQL Query Two to happen just one time, when the job 
starts up, and afterwards maybe store it in Keyed State or Broadcast State, but 
it's not really part of the stream, then what is the best practice for 
supporting that in Flink 

The Flink job needs to stream data from Kafka Topic One, aggregate it, and 
perform computations that require all of the data in SQL Query One and SQL 
Query Two to perform its business logic. 

I am using Flink 1.10. 

I supposed to query the database before the Job I submitted, and then pass it 
on as parameters to a function? 
Or am I supposed to use JDBCInputFormat for both queries and create two 
streams, and somehow connect or broadcast both of them two the main stream that 
uses Kafka Topic One? 

I would appreciate guidance. Please. Thank you. 

Sincerely, 

Marco A. Villalobos 

BQ_END


BQ_END



Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Well of course these differ; on the left you have the decompiled 
bytecode, on the right the original source.


If these were the same you wouldn't need source jars.

On 05/08/2020 12:20, 魏子涵 wrote:
I'm sure the two versions match up. Following is the pic comparing 
codes in IDEA

https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70






At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and
aren't by chance still using the 1.11.0 source jar.

On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class
in【flink-runtime_2.11-1.11.1.jar】does not match the source code.
Is it a problem we need to fix(if it is, what should we do)? or
just let it go?










Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Well of course these differ; on the left you have the decompiled 
bytecode, on the right the original source.


If these were the same you wouldn't need source jars.

On 05/08/2020 12:20, 魏子涵 wrote:
I'm sure the two versions match up. Following is the pic comparing 
codes in IDEA

https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70






At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and
aren't by chance still using the 1.11.0 source jar.

On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class
in【flink-runtime_2.11-1.11.1.jar】does not match the source code.
Is it a problem we need to fix(if it is, what should we do)? or
just let it go?










Re: The bytecode of the class does not match the source code

2020-08-05 Thread Jake

hi 魏子涵

Idea decompiled code is not match java source code, you can download java 
source code in idea.

/Volumes/work/maven_repository/org/apache/flink/flink-runtime_2.11/1.10.1/flink-runtime_2.11-1.10.1-sources.jar!/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java

Jake


> On Aug 5, 2020, at 6:20 PM, 魏子涵  wrote:
> 
> I'm sure the two versions match up. Following is the pic comparing codes in 
> IDEA
> https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70
> 
> 
> 
> 
> 
> At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:
> 
> Please make sure you have loaded the correct source jar, and aren't by chance 
> still using the 1.11.0 source jar.
> 
> On 05/08/2020 09:57, 魏子涵 wrote:
>> Hi, everyone:
>>   I found  the 
>> 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class 
>> in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
>> problem we need to fix(if it is, what should we do)? or just let it go?
>> 
>> 
>>  
> 
> 
> 
>  



?????? flink-1.11 ????????

2020-08-05 Thread kcz
 




----
??: 
   "user-zh"



Re:Re: The bytecode of the class does not match the source code

2020-08-05 Thread 魏子涵
I'm sure the two versions match up. Following is the pic comparing codes in IDEA
https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70
















At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and aren't by chance 
still using the 1.11.0 source jar.



On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
  I found  the 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 
class in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
problem we need to fix(if it is, what should we do)? or just let it go?




 




Re:Re: The bytecode of the class does not match the source code

2020-08-05 Thread 魏子涵
I'm sure the two versions match up. Following is the pic comparing codes in IDEA
https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70
















At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and aren't by chance 
still using the 1.11.0 source jar.



On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
  I found  the 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 
class in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
problem we need to fix(if it is, what should we do)? or just let it go?




 




Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Wei Zhong
Hi Xingbo,

Thanks for your information. 

I think the PySpark's documentation redesigning deserves our attention. It 
seems that the Spark community has also begun to treat the user experience of 
Python documentation more seriously. We can continue to pay attention to the 
discussion and progress of the redesigning in the Spark community. It is so 
similar to our working that there should be some ideas worthy for us.

Best,
Wei


> 在 2020年8月5日,15:02,Xingbo Huang  写道:
> 
> Hi,
> 
> I found that the spark community is also working on redesigning pyspark 
> documentation[1] recently. Maybe we can compare the difference between our 
> document structure and its document structure.
> 
> [1] https://issues.apache.org/jira/browse/SPARK-31851 
> 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>  
> 
> 
> Best,
> Xingbo
> 
> David Anderson mailto:da...@alpinegizmo.com>> 
> 于2020年8月5日周三 上午3:17写道:
> I'm delighted to see energy going into improving the documentation.
> 
> With the current documentation, I get a lot of questions that I believe 
> reflect two fundamental problems with what we currently provide:
> 
> (1) We have a lot of contextual information in our heads about how Flink 
> works, and we are able to use that knowledge to make reasonable inferences 
> about how things (probably) work in cases we aren't so familiar with. For 
> example, I get a lot of questions of the form "If I use  will I 
> still have exactly once guarantees?" The answer is always yes, but they 
> continue to have doubts because we have failed to clearly communicate this 
> fundamental, underlying principle. 
> 
> This specific example about fault tolerance applies across all of the Flink 
> docs, but the general idea can also be applied to the Table/SQL and PyFlink 
> docs. The guiding principles underlying these APIs should be written down in 
> one easy-to-find place. 
> 
> (2) The other kind of question I get a lot is "Can I do  with ?" E.g., 
> "Can I use the JDBC table sink from PyFlink?" These questions can be very 
> difficult to answer because it is frequently the case that one has to reason 
> about why a given feature doesn't seem to appear in the documentation. It 
> could be that I'm looking in the wrong place, or it could be that someone 
> forgot to document something, or it could be that it can in fact be done by 
> applying a general mechanism in a specific way that I haven't thought of -- 
> as in this case, where one can use a JDBC sink from Python if one thinks to 
> use DDL. 
> 
> So I think it would be helpful to be explicit about both what is, and what is 
> not, supported in PyFlink. And to have some very clear organizing principles 
> in the documentation so that users can quickly learn where to look for 
> specific facts.
> 
> Regards,
> David
> 
> 
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun  > wrote:
> Hi Seth and David,
> 
> I'm very happy to have your reply and suggestions. I would like to share my 
> thoughts here:
> 
> The main motivation we want to refactor the PyFlink doc is that we want to 
> make sure that the Python users could find all they want starting from the 
> PyFlink documentation mainpage. That’s, the PyFlink documentation should have 
> a catalogue which includes all the functionalities available in PyFlink. 
> However, this doesn’t mean that we will make a copy of the content of the 
> documentation in the other places. It may be just a reference/link to the 
> other documentation if needed. For the documentation added under PyFlink 
> mainpage, the principle is that it should only include Python specific 
> content, instead of making a copy of the Java content.
> 
> >>  I'm concerned that this proposal duplicates a lot of content that will 
> >> quickly get out of sync. It feels like it is documenting PyFlink 
> >> separately from the rest of the project.
> 
> Regarding the concerns about maintainability, as mentioned above, The goal of 
> this FLIP is to provide an intelligible entrance of Python API, and the 
> content in it should only contain the information which is useful for Python 
> users. There are indeed many agenda items that duplicate the Java documents 
> in this FLIP, but it doesn't mean the content would be copied from Java 
> documentation. i.e, if the content of the document is the same as the 
> corresponding Java document, we will add a link to the Java document. e.g. 
> the "Built-in functions" and "SQL". We only create a page for the Python-only 
> content, and then redirect to the Java document if there is something shared 
> with Java. e.g. "Connectors" and "Catalogs". If the document is Python-only 
> and already exists, we will move it from the old python 

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Wei Zhong
Hi Xingbo,

Thanks for your information. 

I think the PySpark's documentation redesigning deserves our attention. It 
seems that the Spark community has also begun to treat the user experience of 
Python documentation more seriously. We can continue to pay attention to the 
discussion and progress of the redesigning in the Spark community. It is so 
similar to our working that there should be some ideas worthy for us.

Best,
Wei


> 在 2020年8月5日,15:02,Xingbo Huang  写道:
> 
> Hi,
> 
> I found that the spark community is also working on redesigning pyspark 
> documentation[1] recently. Maybe we can compare the difference between our 
> document structure and its document structure.
> 
> [1] https://issues.apache.org/jira/browse/SPARK-31851 
> 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>  
> 
> 
> Best,
> Xingbo
> 
> David Anderson mailto:da...@alpinegizmo.com>> 
> 于2020年8月5日周三 上午3:17写道:
> I'm delighted to see energy going into improving the documentation.
> 
> With the current documentation, I get a lot of questions that I believe 
> reflect two fundamental problems with what we currently provide:
> 
> (1) We have a lot of contextual information in our heads about how Flink 
> works, and we are able to use that knowledge to make reasonable inferences 
> about how things (probably) work in cases we aren't so familiar with. For 
> example, I get a lot of questions of the form "If I use  will I 
> still have exactly once guarantees?" The answer is always yes, but they 
> continue to have doubts because we have failed to clearly communicate this 
> fundamental, underlying principle. 
> 
> This specific example about fault tolerance applies across all of the Flink 
> docs, but the general idea can also be applied to the Table/SQL and PyFlink 
> docs. The guiding principles underlying these APIs should be written down in 
> one easy-to-find place. 
> 
> (2) The other kind of question I get a lot is "Can I do  with ?" E.g., 
> "Can I use the JDBC table sink from PyFlink?" These questions can be very 
> difficult to answer because it is frequently the case that one has to reason 
> about why a given feature doesn't seem to appear in the documentation. It 
> could be that I'm looking in the wrong place, or it could be that someone 
> forgot to document something, or it could be that it can in fact be done by 
> applying a general mechanism in a specific way that I haven't thought of -- 
> as in this case, where one can use a JDBC sink from Python if one thinks to 
> use DDL. 
> 
> So I think it would be helpful to be explicit about both what is, and what is 
> not, supported in PyFlink. And to have some very clear organizing principles 
> in the documentation so that users can quickly learn where to look for 
> specific facts.
> 
> Regards,
> David
> 
> 
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun  > wrote:
> Hi Seth and David,
> 
> I'm very happy to have your reply and suggestions. I would like to share my 
> thoughts here:
> 
> The main motivation we want to refactor the PyFlink doc is that we want to 
> make sure that the Python users could find all they want starting from the 
> PyFlink documentation mainpage. That’s, the PyFlink documentation should have 
> a catalogue which includes all the functionalities available in PyFlink. 
> However, this doesn’t mean that we will make a copy of the content of the 
> documentation in the other places. It may be just a reference/link to the 
> other documentation if needed. For the documentation added under PyFlink 
> mainpage, the principle is that it should only include Python specific 
> content, instead of making a copy of the Java content.
> 
> >>  I'm concerned that this proposal duplicates a lot of content that will 
> >> quickly get out of sync. It feels like it is documenting PyFlink 
> >> separately from the rest of the project.
> 
> Regarding the concerns about maintainability, as mentioned above, The goal of 
> this FLIP is to provide an intelligible entrance of Python API, and the 
> content in it should only contain the information which is useful for Python 
> users. There are indeed many agenda items that duplicate the Java documents 
> in this FLIP, but it doesn't mean the content would be copied from Java 
> documentation. i.e, if the content of the document is the same as the 
> corresponding Java document, we will add a link to the Java document. e.g. 
> the "Built-in functions" and "SQL". We only create a page for the Python-only 
> content, and then redirect to the Java document if there is something shared 
> with Java. e.g. "Connectors" and "Catalogs". If the document is Python-only 
> and already exists, we will move it from the old python 

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 Thread Congxian Qiu
Hi
  RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].

  另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及 HDFS
上 checkpoint 目录的截图

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend

Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:

> 你好,ttl配置是
> val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
> val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> val tConfig = tableEnv.getConfig
> tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))
>
>
>   1)目前是有3个任务都是这种情况
>   2)目前集群没有RocksDB环境
> 谢谢
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月5日(星期三) 下午3:30
> 收件人:"user-zh"
> 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi op
>  这个情况比较奇怪。我想确认下:
>  1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
>  2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
>
>  另外,你 TTL 其他的配置是怎么设置的呢?
>
> 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年8月5日周三 下午2:46写道:
>
>  nbsp; nbsp;
> 
> 你好,我使用的是FsStateBackendnbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>  nbsp;
> nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>  nbsp; nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
>  by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>  nbsp; nbsp;运行5天能满足清理条件
> 
> 
> 
> 
>  -- 原始邮件 --
>  发件人:
> 
> "user-zh"
> 
> <
>  qcx978132...@gmail.comgt;;
>  发送时间:nbsp;2020年8月3日(星期一) 下午5:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> 
> 
> 
>  Hi
>  nbsp;nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
>  目录的数据量看,有增长,后续基本持平。现在
>  Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> checkpoint
>  之间,数据改动很多的话,这个值会变大
> 
>  [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> 
> ;
> Best,
>  Congxian
> 
> 
>  op <520075...@qq.comgt; 于2020年8月3日周一 下午2:18写道:
> 
>  gt; amp;nbsp; amp;nbsp;
>  gt;
> 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
>  gt; 逻辑是按照 事件day 和 id 进行groupby
>  gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
>  gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>  gt; Time.minutes(1440+10))
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  nbsp; "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  nbsp; <
>  gt; 384939...@qq.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年8月3日(星期一) 中午1:50
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口
> 操作后 状态越来越大
>  gt;
>  gt;
>  gt;
>  gt; hi,您好:
>  gt; 我改回增量模式重新收集了一些数据:
>  gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
>  gt; 2、checkpoint是interval设置的是5秒
>  gt; 3、目前这个作业是每分钟一个窗口
>  gt; 4、并行度设置的1,使用on-yarn模式
>  gt;
>  gt; 刚启动的时候,如下:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
>  gt;
>  gt; 18分钟后,如下:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt;
>  gt;
>  gt; checkpoints设置:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt;
>  gt;
>  gt; hdfs上面大小:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt;
>  gt;
>  gt; 页面上看到的大小:
>  gt; <
> 
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt
> 
> ;
> ;
>  gt;
>  gt;
>  gt; Congxian Qiu wrote
>  gt; amp;gt; Hiamp;nbsp;amp;nbsp; 鱼子酱
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
> 能否把在使用增量 checkpoint
>  的模式下,截图看一下 checkpoint
>  gt; size 的走势呢?另外可以的话,也麻烦你在每次
>  gt; amp;gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
>  另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>  gt; amp;gt;
>  gt; amp;gt; Best,
>  gt; amp;gt; Congxian
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; 鱼子酱 <
>  gt;
>  gt; amp;gt; 384939718@
>  gt;
>  gt; amp;gt;amp;gt; 于2020年7月30日周四 上午10:43写道:
>  gt; 

Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Please make sure you have loaded the correct source jar, and aren't by 
chance still using the 1.11.0 source jar.


On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the 
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class 
in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it 
a problem we need to fix(if it is, what should we do)? or just let it go?







Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Please make sure you have loaded the correct source jar, and aren't by 
chance still using the 1.11.0 source jar.


On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the 
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class 
in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it 
a problem we need to fix(if it is, what should we do)? or just let it go?







Re:Re:写入hive 问题

2020-08-05 Thread air23



你好 谢谢。去掉版本号 确实可以了。我用的版本 和我安装的hive版本是一致的。不知道是什么原因导致的。


















在 2020-08-05 15:59:06,"wldd"  写道:
>hi:
>1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致
>2.你也可以尝试在配置hive catalog的时候,不设置hive版本
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best,
>wldd
>
>
>
>
>
>在 2020-08-05 15:38:26,"air23"  写道:
>>你好 
>>15:33:59,781 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>- Created HiveCatalog 'myhive1'
>>Exception in thread "main" 
>>org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
>>Hive Metastore client
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
>>at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
>>at 
>>org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
>>at 
>>org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
>>at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73)
>>Caused by: java.lang.NoSuchMethodException: 
>>org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
>>at java.lang.Class.getMethod(Class.java:1786)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54)
>>... 7 mor
>>
>>
>>
>>
>>请问这个是什么问题 Metastore 也已经启动了。
>>谢谢


flink sql eos

2020-08-05 Thread sllence
大家好

   请问目前flink sql是不是不能没有开启全局端到端精确一致性(eos)的方
式,

目前仅有kafka实现了TwoPhaseCommitSinkFunction,但kafka的ddl中也没有属性去设
置Semantic为EXACTLY_ONCE

 

我们是否可以去支持更多的事务性connector,并可以在flink sql维度支持开启全局的
端到端一致性,并为每个connector是否支持EXACTLY_ONCE进行验证,

当开启全局EXACTLY_ONCE并且所有使用的connector都支持EXACTLY_ONCE,是否整个应
用程序就可以做到端到端的精确一致性



Re: Kafka transaction error lead to data loss under end to end exact-once

2020-08-05 Thread Khachatryan Roman
Hi Lu,

AFAIK, it's not going to be fixed. As you mentioned in the first email,
Kafka should be configured so that it's transaction timeout is less than
your max checkpoint duration.

However, you should not only change transaction.timeout.ms in producer but
also transaction.max.timeout.ms on your brokers.
Please refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats

Regards,
Roman


On Wed, Aug 5, 2020 at 12:24 AM Lu Niu  wrote:

> Hi, Khachatryan
>
> Thank you for the reply. Is that a problem that can be fixed? If so, is
> the fix on roadmap? Thanks!
>
> Best
> Lu
>
> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Lu,
>>
>> Yes, this error indicates data loss (unless there were no records in the
>> transactions).
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu  wrote:
>>
>>> Hi,
>>>
>>> We are using end to end exact-once flink + kafka and
>>> encountered belowing exception which usually came after checkpoint failures:
>>> ```
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>> Producer attempted an operation with an old epoch. Either there is a newer
>>> producer with the same transactionalId, or the producer's transaction has
>>> been expired by the broker.2020-07-28 16:27:51,633 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job xxx
>>> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
>>> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>> failed, logging first encountered failure at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
>>> more*
>>> ```
>>> We did some end to end tests and noticed whenever such a thing happens,
>>> there will be a data loss.
>>>
>>> Referring to several related questions, I understand I need to increase `
>>> transaction.timeout.ms`  because:
>>> ```
>>> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
>>> that were started before taking a checkpoint, after recovering from the
>>> said checkpoint. If the time between Flink application crash and completed
>>> restart is larger than Kafka’s transaction timeout there will be data loss
>>> (Kafka will automatically abort transactions that exceeded timeout time).*
>>> ```
>>>
>>> But I want to confirm with the community that:
>>> *Does an exception like this will always lead to data loss? *
>>>
>>> I asked because we get this exception sometimes even when the checkpoint
>>> succeeds.
>>>
>>> Setup:
>>> Flink 1.9.1
>>>
>>> Best
>>> Lu
>>>
>>


?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 Thread op
??ttl??
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
val tConfig = tableEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))


  1)3??
  2)RocksDB

----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
 Best,
 Congxian


 op <520075...@qq.comgt; ??2020??8??3?? 2:18??

 gt; amp;nbsp; amp;nbsp;
 gt; 
1.11.0hdfscheckpoint??checkpoint3??
 gt; ?? day ?? id groupby
 gt; 
7watermark??
 gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 gt; Time.minutes(1440+10))
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 nbsp; "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 nbsp; <
 gt; 384939...@qq.comamp;gt;;
 gt; :amp;nbsp;2020??8??3??(??) 1:50
 gt; 
??:amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
 gt;
 gt; 18??
 gt; 


Re:写入hive 问题

2020-08-05 Thread wldd
hi:
1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致
2.你也可以尝试在配置hive catalog的时候,不设置hive版本













--

Best,
wldd





在 2020-08-05 15:38:26,"air23"  写道:
>你好 
>15:33:59,781 INFO  org.apache.flink.table.catalog.hive.HiveCatalog 
>   - Created HiveCatalog 'myhive1'
>Exception in thread "main" 
>org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
>Hive Metastore client
>at 
>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
>at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
>at 
>org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
>at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73)
>Caused by: java.lang.NoSuchMethodException: 
>org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
>at java.lang.Class.getMethod(Class.java:1786)
>at 
>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54)
>... 7 mor
>
>
>
>
>请问这个是什么问题 Metastore 也已经启动了。
>谢谢


The bytecode of the class does not match the source code

2020-08-05 Thread 魏子涵
Hi, everyone:
  I found  the 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 
class in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
problem we need to fix(if it is, what should we do)? or just let it go?

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Till Rohrmann
Yes for the other deployments it is not a problem. A reason why people
preferred non-zero exit codes in case of FAILED jobs is that this is easier
to monitor than having to take a look at the actual job result. Moreover,
in the YARN web UI the application shows as failed if I am not mistaken.
However, from a framework's perspective, a FAILED job does not mean that
Flink has failed and, hence, the return code could still be 0 in my opinion.

Cheers,
Till

On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:

> Hi Eleanore,
>
> Yes, I suggest to use Job to replace Deployment. It could be used to run
> jobmanager one time and finish after a successful/failed completion.
>
> However, using Job still could not solve your problem completely. Just as
> Till said, When a job exhausts the restart strategy, the jobmanager
> pod will terminate with non-zero exit code. It will cause the K8s
> restarting it again. Even though we could set the resartPolicy and
> backoffLimit,
> this is not a clean and correct way to go. We should terminate the
> jobmanager process with zero exit code in such situation.
>
> @Till Rohrmann  I just have one concern. Is it a
> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
> terminating with
> non-zero exit code is harmless.
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>
>> Hi Yang & Till,
>>
>> Thanks for your prompt reply!
>>
>> Yang, regarding your question, I am actually not using k8s job, as I put
>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>> k8s service for job manager.
>>
>> As you mentioned above, if flink job is marked as failed, it will cause
>> the job manager pod to be restarted. Which is not the ideal behavior.
>>
>> Do you suggest that I should change the deployment strategy from using
>> k8s deployment to k8s job? In case the flink program exit with non-zero
>> code (e.g. exhausted number of configured restart), pod can be marked as
>> complete hence not restarting the job again?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>
>>> @Till Rohrmann  In native mode, when a Flink
>>> application terminates with FAILED state, all the resources will be cleaned
>>> up.
>>>
>>> However, in standalone mode, I agree with you that we need to rethink
>>> the exit code of Flink. When a job exhausts the restart
>>> strategy, we should terminate the pod and do not restart again. After
>>> googling, it seems that we could not specify the restartPolicy
>>> based on exit code[1]. So maybe we need to return a zero exit code to
>>> avoid restarting by K8s.
>>>
>>> [1].
>>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>>
>>> Best,
>>> Yang
>>>
>>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>>
 @Yang Wang  I believe that we should
 rethink the exit codes of Flink. In general you want K8s to restart a
 failed Flink process. Hence, an application which terminates in state
 FAILED should not return a non-zero exit code because it is a valid
 termination state.

 Cheers,
 Till

 On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:

> Hi Eleanore,
>
> I think you are using K8s resource "Job" to deploy the jobmanager.
> Please set .spec.template.spec.restartPolicy = "Never" and
> spec.backoffLimit = 0.
> Refer here[1] for more information.
>
> Then, when the jobmanager failed because of any reason, the K8s job
> will be marked failed. And K8s will not restart the job again.
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>
>> Hi Till,
>>
>> Thanks for the reply!
>>
>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>> Specifically, I build a custom docker image, which I copied the app jar
>> (not uber jar) and all its dependencies under /flink/lib.
>>
>> So my question is more like, in this case, if the job is marked as
>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>> what are the suggestions for such scenario?
>>
>> Thanks a lot!
>> Eleanore
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>
>> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> how are you deploying Flink exactly? Are you using the application
>>> mode with native K8s support to deploy a cluster [1] or are you manually
>>> deploying a per-job mode [2]?
>>>
>>> I believe the problem might be that we terminate the Flink process
>>> with a non-zero exit code if the job reaches the 

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Till Rohrmann
Yes for the other deployments it is not a problem. A reason why people
preferred non-zero exit codes in case of FAILED jobs is that this is easier
to monitor than having to take a look at the actual job result. Moreover,
in the YARN web UI the application shows as failed if I am not mistaken.
However, from a framework's perspective, a FAILED job does not mean that
Flink has failed and, hence, the return code could still be 0 in my opinion.

Cheers,
Till

On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:

> Hi Eleanore,
>
> Yes, I suggest to use Job to replace Deployment. It could be used to run
> jobmanager one time and finish after a successful/failed completion.
>
> However, using Job still could not solve your problem completely. Just as
> Till said, When a job exhausts the restart strategy, the jobmanager
> pod will terminate with non-zero exit code. It will cause the K8s
> restarting it again. Even though we could set the resartPolicy and
> backoffLimit,
> this is not a clean and correct way to go. We should terminate the
> jobmanager process with zero exit code in such situation.
>
> @Till Rohrmann  I just have one concern. Is it a
> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
> terminating with
> non-zero exit code is harmless.
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>
>> Hi Yang & Till,
>>
>> Thanks for your prompt reply!
>>
>> Yang, regarding your question, I am actually not using k8s job, as I put
>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>> k8s service for job manager.
>>
>> As you mentioned above, if flink job is marked as failed, it will cause
>> the job manager pod to be restarted. Which is not the ideal behavior.
>>
>> Do you suggest that I should change the deployment strategy from using
>> k8s deployment to k8s job? In case the flink program exit with non-zero
>> code (e.g. exhausted number of configured restart), pod can be marked as
>> complete hence not restarting the job again?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>
>>> @Till Rohrmann  In native mode, when a Flink
>>> application terminates with FAILED state, all the resources will be cleaned
>>> up.
>>>
>>> However, in standalone mode, I agree with you that we need to rethink
>>> the exit code of Flink. When a job exhausts the restart
>>> strategy, we should terminate the pod and do not restart again. After
>>> googling, it seems that we could not specify the restartPolicy
>>> based on exit code[1]. So maybe we need to return a zero exit code to
>>> avoid restarting by K8s.
>>>
>>> [1].
>>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>>
>>> Best,
>>> Yang
>>>
>>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>>
 @Yang Wang  I believe that we should
 rethink the exit codes of Flink. In general you want K8s to restart a
 failed Flink process. Hence, an application which terminates in state
 FAILED should not return a non-zero exit code because it is a valid
 termination state.

 Cheers,
 Till

 On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:

> Hi Eleanore,
>
> I think you are using K8s resource "Job" to deploy the jobmanager.
> Please set .spec.template.spec.restartPolicy = "Never" and
> spec.backoffLimit = 0.
> Refer here[1] for more information.
>
> Then, when the jobmanager failed because of any reason, the K8s job
> will be marked failed. And K8s will not restart the job again.
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>
>> Hi Till,
>>
>> Thanks for the reply!
>>
>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>> Specifically, I build a custom docker image, which I copied the app jar
>> (not uber jar) and all its dependencies under /flink/lib.
>>
>> So my question is more like, in this case, if the job is marked as
>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>> what are the suggestions for such scenario?
>>
>> Thanks a lot!
>> Eleanore
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>
>> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> how are you deploying Flink exactly? Are you using the application
>>> mode with native K8s support to deploy a cluster [1] or are you manually
>>> deploying a per-job mode [2]?
>>>
>>> I believe the problem might be that we terminate the Flink process
>>> with a non-zero exit code if the job reaches the 

写入hive 问题

2020-08-05 Thread air23
你好 
15:33:59,781 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
 - Created HiveCatalog 'myhive1'
Exception in thread "main" 
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
Hive Metastore client
at 
org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73)
Caused by: java.lang.NoSuchMethodException: 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
at java.lang.Class.getMethod(Class.java:1786)
at 
org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54)
... 7 mor




请问这个是什么问题 Metastore 也已经启动了。
谢谢

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 Thread Congxian Qiu
Hi op
   这个情况比较奇怪。我想确认下:
   1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
   2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢

   另外,你 TTL 其他的配置是怎么设置的呢?

从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午2:46写道:

>  
> 你好,我使用的是FsStateBackend状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>  设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>  观察到的checkpoint shared 目录大小一直在增加,也确认过group
> by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>  运行5天能满足清理条件
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月3日(星期一) 下午5:50
> 收件人:"user-zh"
> 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi
>  能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> 目录的数据量看,有增长,后续基本持平。现在
> Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果 checkpoint
> 之间,数据改动很多的话,这个值会变大
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年8月3日周一 下午2:18写道:
>
>  nbsp; nbsp;
>  同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
>  逻辑是按照 事件day 和 id 进行groupby
>  然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
>  tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>  Time.minutes(1440+10))
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
>  "user-zh"
> 
>  <
>  384939...@qq.comgt;;
>  发送时间:nbsp;2020年8月3日(星期一) 中午1:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> 
> 
> 
>  hi,您好:
>  我改回增量模式重新收集了一些数据:
>  1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
>  2、checkpoint是interval设置的是5秒
>  3、目前这个作业是每分钟一个窗口
>  4、并行度设置的1,使用on-yarn模式
> 
>  刚启动的时候,如下:
>   
>  18分钟后,如下:
>   
>  checkpoints设置:
>   
>  hdfs上面大小:
>   
>  页面上看到的大小:
>  <
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pnggt
> ;
> 
> 
>  Congxian Qiu wrote
>  gt; Hinbsp;nbsp; 鱼子酱
>  gt;nbsp;nbsp;nbsp;nbsp; 能否把在使用增量 checkpoint
> 的模式下,截图看一下 checkpoint
>  size 的走势呢?另外可以的话,也麻烦你在每次
>  gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>  gt;nbsp;nbsp;nbsp;nbsp;
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>  gt;
>  gt; Best,
>  gt; Congxian
>  gt;
>  gt;
>  gt; 鱼子酱 <
> 
>  gt; 384939718@
> 
>  gt;gt; 于2020年7月30日周四 上午10:43写道:
>  gt;
>  gt;gt; 感谢!
>  gt;gt;
>  gt;gt; flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>  gt;gt; 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>  gt;gt; StateBackend backend =new
>  gt;gt;
>  gt;gt;
> 
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>  gt;gt; StateBackend backend =new
>  gt;gt;
>  gt;gt;
> 
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>  gt;gt;
>  gt;gt;
>  gt;gt; 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>  gt;gt; RocksDBStateBackend:
>  gt;gt; amp;lt;
>  http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;gt
> ;
>  gt;gt; FsStateBackend:
>  gt;gt; amp;lt;
>  http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;gt
> ;
>  gt;gt;
>  gt;gt;
>  gt;gt;
>  gt;gt;
>  gt;gt; --
>  gt;gt; Sent from: http://apache-flink.147419.n8.nabble.com/
>  ; gt;gt;
> 
> 
> 
> 
> 
>  --
>  Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Yang Wang
Hi Eleanore,

Yes, I suggest to use Job to replace Deployment. It could be used to run
jobmanager one time and finish after a successful/failed completion.

However, using Job still could not solve your problem completely. Just as
Till said, When a job exhausts the restart strategy, the jobmanager
pod will terminate with non-zero exit code. It will cause the K8s
restarting it again. Even though we could set the resartPolicy and
backoffLimit,
this is not a clean and correct way to go. We should terminate the
jobmanager process with zero exit code in such situation.

@Till Rohrmann  I just have one concern. Is it a
special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
terminating with
non-zero exit code is harmless.


Best,
Yang

Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I put
> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
> deployment for job manager, and 1 k8s deployment for task manager, and 1
> k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will cause
> the job manager pod to be restarted. Which is not the ideal behavior.
>
> Do you suggest that I should change the deployment strategy from using k8s
> deployment to k8s job? In case the flink program exit with non-zero code
> (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink the
>> exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should rethink the
>>> exit codes of Flink. In general you want K8s to restart a failed Flink
>>> process. Hence, an application which terminates in state FAILED should not
>>> return a non-zero exit code because it is a valid termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to deploy the jobmanager.
 Please set .spec.template.spec.restartPolicy = "Never" and
 spec.backoffLimit = 0.
 Refer here[1] for more information.

 Then, when the jobmanager failed because of any reason, the K8s job
 will be marked failed. And K8s will not restart the job again.

 [1].
 https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 上午12:05写道:

> Hi Till,
>
> Thanks for the reply!
>
> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
> Specifically, I build a custom docker image, which I copied the app jar
> (not uber jar) and all its dependencies under /flink/lib.
>
> So my question is more like, in this case, if the job is marked as
> FAILED, which causes k8s to restart the pod, this seems not help at all,
> what are the suggestions for such scenario?
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>
> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
> wrote:
>
>> Hi Eleanore,
>>
>> how are you deploying Flink exactly? Are you using the application
>> mode with native K8s support to deploy a cluster [1] or are you manually
>> deploying a per-job mode [2]?
>>
>> I believe the problem might be that we terminate the Flink process
>> with a non-zero exit code if the job reaches the ApplicationStatus.FAILED
>> [3].
>>
>> cc Yang Wang have you observed a similar behavior when running Flink
>> in per-job mode on K8s?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>> [3]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>
>> On Fri, Jul 31, 2020 at 6:26 PM 

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Yang Wang
Hi Eleanore,

Yes, I suggest to use Job to replace Deployment. It could be used to run
jobmanager one time and finish after a successful/failed completion.

However, using Job still could not solve your problem completely. Just as
Till said, When a job exhausts the restart strategy, the jobmanager
pod will terminate with non-zero exit code. It will cause the K8s
restarting it again. Even though we could set the resartPolicy and
backoffLimit,
this is not a clean and correct way to go. We should terminate the
jobmanager process with zero exit code in such situation.

@Till Rohrmann  I just have one concern. Is it a
special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
terminating with
non-zero exit code is harmless.


Best,
Yang

Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I put
> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
> deployment for job manager, and 1 k8s deployment for task manager, and 1
> k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will cause
> the job manager pod to be restarted. Which is not the ideal behavior.
>
> Do you suggest that I should change the deployment strategy from using k8s
> deployment to k8s job? In case the flink program exit with non-zero code
> (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink the
>> exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should rethink the
>>> exit codes of Flink. In general you want K8s to restart a failed Flink
>>> process. Hence, an application which terminates in state FAILED should not
>>> return a non-zero exit code because it is a valid termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to deploy the jobmanager.
 Please set .spec.template.spec.restartPolicy = "Never" and
 spec.backoffLimit = 0.
 Refer here[1] for more information.

 Then, when the jobmanager failed because of any reason, the K8s job
 will be marked failed. And K8s will not restart the job again.

 [1].
 https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 上午12:05写道:

> Hi Till,
>
> Thanks for the reply!
>
> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
> Specifically, I build a custom docker image, which I copied the app jar
> (not uber jar) and all its dependencies under /flink/lib.
>
> So my question is more like, in this case, if the job is marked as
> FAILED, which causes k8s to restart the pod, this seems not help at all,
> what are the suggestions for such scenario?
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>
> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
> wrote:
>
>> Hi Eleanore,
>>
>> how are you deploying Flink exactly? Are you using the application
>> mode with native K8s support to deploy a cluster [1] or are you manually
>> deploying a per-job mode [2]?
>>
>> I believe the problem might be that we terminate the Flink process
>> with a non-zero exit code if the job reaches the ApplicationStatus.FAILED
>> [3].
>>
>> cc Yang Wang have you observed a similar behavior when running Flink
>> in per-job mode on K8s?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>> [3]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>
>> On Fri, Jul 31, 2020 at 6:26 PM 

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Xingbo Huang
Hi,

I found that the spark community is also working on redesigning pyspark
documentation[1] recently. Maybe we can compare the difference between our
document structure and its document structure.

[1] https://issues.apache.org/jira/browse/SPARK-31851
http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html

Best,
Xingbo

David Anderson  于2020年8月5日周三 上午3:17写道:

> I'm delighted to see energy going into improving the documentation.
>
> With the current documentation, I get a lot of questions that I believe
> reflect two fundamental problems with what we currently provide:
>
> (1) We have a lot of contextual information in our heads about how Flink
> works, and we are able to use that knowledge to make reasonable inferences
> about how things (probably) work in cases we aren't so familiar with. For
> example, I get a lot of questions of the form "If I use  will
> I still have exactly once guarantees?" The answer is always yes, but they
> continue to have doubts because we have failed to clearly communicate this
> fundamental, underlying principle.
>
> This specific example about fault tolerance applies across all of the
> Flink docs, but the general idea can also be applied to the Table/SQL and
> PyFlink docs. The guiding principles underlying these APIs should be
> written down in one easy-to-find place.
>
> (2) The other kind of question I get a lot is "Can I do  with ?"
> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
> very difficult to answer because it is frequently the case that one has to
> reason about why a given feature doesn't seem to appear in the
> documentation. It could be that I'm looking in the wrong place, or it could
> be that someone forgot to document something, or it could be that it can in
> fact be done by applying a general mechanism in a specific way that I
> haven't thought of -- as in this case, where one can use a JDBC sink from
> Python if one thinks to use DDL.
>
> So I think it would be helpful to be explicit about both what is, and what
> is not, supported in PyFlink. And to have some very clear organizing
> principles in the documentation so that users can quickly learn where to
> look for specific facts.
>
> Regards,
> David
>
>
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
> wrote:
>
>> Hi Seth and David,
>>
>> I'm very happy to have your reply and suggestions. I would like to share
>> my thoughts here:
>>
>> The main motivation we want to refactor the PyFlink doc is that we want
>> to make sure that the Python users could find all they want starting from
>> the PyFlink documentation mainpage. That’s, the PyFlink documentation
>> should have a catalogue which includes all the functionalities available in
>> PyFlink. However, this doesn’t mean that we will make a copy of the content
>> of the documentation in the other places. It may be just a reference/link
>> to the other documentation if needed. For the documentation added under
>> PyFlink mainpage, the principle is that it should only include Python
>> specific content, instead of making a copy of the Java content.
>>
>> >>  I'm concerned that this proposal duplicates a lot of content that
>> will quickly get out of sync. It feels like it is documenting PyFlink
>> separately from the rest of the project.
>>
>> Regarding the concerns about maintainability, as mentioned above, The
>> goal of this FLIP is to provide an intelligible entrance of Python API, and
>> the content in it should only contain the information which is useful for
>> Python users. There are indeed many agenda items that duplicate the Java
>> documents in this FLIP, but it doesn't mean the content would be copied
>> from Java documentation. i.e, if the content of the document is the same as
>> the corresponding Java document, we will add a link to the Java document.
>> e.g. the "Built-in functions" and "SQL". We only create a page for the
>> Python-only content, and then redirect to the Java document if there is
>> something shared with Java. e.g. "Connectors" and "Catalogs". If the
>> document is Python-only and already exists, we will move it from the old
>> python document to the new python document, e.g. "Configurations". If the
>> document is Python-only and not exists before, we will create a new page
>> for it. e.g. "DataTypes".
>>
>> The main reason we create a new page for Python Data Types is that it is
>> only conceptually one-to-one correspondence with Java Data Types, but the
>> actual document content would be very different from Java DataTypes. Some
>> detailed difference are as following:
>>
>>
>>
>>   - The text in the Java Data Types document is written for JVM-based
>> language users, which is incomprehensible to users who only understand
>> python.
>>
>>   - Currently the Python Data Types does not support the "bridgedTo"
>> method, DataTypes.RAW, DataTypes.NULL and User Defined Types.
>>
>>   - The section "Planner 

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Xingbo Huang
Hi,

I found that the spark community is also working on redesigning pyspark
documentation[1] recently. Maybe we can compare the difference between our
document structure and its document structure.

[1] https://issues.apache.org/jira/browse/SPARK-31851
http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html

Best,
Xingbo

David Anderson  于2020年8月5日周三 上午3:17写道:

> I'm delighted to see energy going into improving the documentation.
>
> With the current documentation, I get a lot of questions that I believe
> reflect two fundamental problems with what we currently provide:
>
> (1) We have a lot of contextual information in our heads about how Flink
> works, and we are able to use that knowledge to make reasonable inferences
> about how things (probably) work in cases we aren't so familiar with. For
> example, I get a lot of questions of the form "If I use  will
> I still have exactly once guarantees?" The answer is always yes, but they
> continue to have doubts because we have failed to clearly communicate this
> fundamental, underlying principle.
>
> This specific example about fault tolerance applies across all of the
> Flink docs, but the general idea can also be applied to the Table/SQL and
> PyFlink docs. The guiding principles underlying these APIs should be
> written down in one easy-to-find place.
>
> (2) The other kind of question I get a lot is "Can I do  with ?"
> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
> very difficult to answer because it is frequently the case that one has to
> reason about why a given feature doesn't seem to appear in the
> documentation. It could be that I'm looking in the wrong place, or it could
> be that someone forgot to document something, or it could be that it can in
> fact be done by applying a general mechanism in a specific way that I
> haven't thought of -- as in this case, where one can use a JDBC sink from
> Python if one thinks to use DDL.
>
> So I think it would be helpful to be explicit about both what is, and what
> is not, supported in PyFlink. And to have some very clear organizing
> principles in the documentation so that users can quickly learn where to
> look for specific facts.
>
> Regards,
> David
>
>
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
> wrote:
>
>> Hi Seth and David,
>>
>> I'm very happy to have your reply and suggestions. I would like to share
>> my thoughts here:
>>
>> The main motivation we want to refactor the PyFlink doc is that we want
>> to make sure that the Python users could find all they want starting from
>> the PyFlink documentation mainpage. That’s, the PyFlink documentation
>> should have a catalogue which includes all the functionalities available in
>> PyFlink. However, this doesn’t mean that we will make a copy of the content
>> of the documentation in the other places. It may be just a reference/link
>> to the other documentation if needed. For the documentation added under
>> PyFlink mainpage, the principle is that it should only include Python
>> specific content, instead of making a copy of the Java content.
>>
>> >>  I'm concerned that this proposal duplicates a lot of content that
>> will quickly get out of sync. It feels like it is documenting PyFlink
>> separately from the rest of the project.
>>
>> Regarding the concerns about maintainability, as mentioned above, The
>> goal of this FLIP is to provide an intelligible entrance of Python API, and
>> the content in it should only contain the information which is useful for
>> Python users. There are indeed many agenda items that duplicate the Java
>> documents in this FLIP, but it doesn't mean the content would be copied
>> from Java documentation. i.e, if the content of the document is the same as
>> the corresponding Java document, we will add a link to the Java document.
>> e.g. the "Built-in functions" and "SQL". We only create a page for the
>> Python-only content, and then redirect to the Java document if there is
>> something shared with Java. e.g. "Connectors" and "Catalogs". If the
>> document is Python-only and already exists, we will move it from the old
>> python document to the new python document, e.g. "Configurations". If the
>> document is Python-only and not exists before, we will create a new page
>> for it. e.g. "DataTypes".
>>
>> The main reason we create a new page for Python Data Types is that it is
>> only conceptually one-to-one correspondence with Java Data Types, but the
>> actual document content would be very different from Java DataTypes. Some
>> detailed difference are as following:
>>
>>
>>
>>   - The text in the Java Data Types document is written for JVM-based
>> language users, which is incomprehensible to users who only understand
>> python.
>>
>>   - Currently the Python Data Types does not support the "bridgedTo"
>> method, DataTypes.RAW, DataTypes.NULL and User Defined Types.
>>
>>   - The section "Planner 

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 Thread op
  
FsStateBackend??5checkpoint??300ms
 
??1440minute??5
 checkpoint shared group 
by??key??
 5




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian


op <520075...@qq.com ??2020??8??3?? 2:18??

 nbsp; nbsp;
 
1.11.0hdfscheckpoint??checkpoint3??
 ?? day ?? id groupby
 
7watermark??
 tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 Time.minutes(1440+10))




 --nbsp;nbsp;--
 ??:

  "user-zh"

  <
 384939...@qq.comgt;;
 :nbsp;2020??8??3??(??) 1:50
 ??:nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pnggt;

 18??