Re: 关于flink sql 滚动窗口无法输出结果集合

2020-05-29 文章 Benchao Li
CC user-zh

Hi steven,
我刚意识到这个回复只是回复到了私人邮箱,没有抄送社区。现在已经抄送了user-zh邮件列表。
第二个问题是,我才意识到最初你发送的邮件列表是user,而不是user-zh。下次如果是中文的邮件,可以直接发送user-zh,而不是user。
user邮件列表推荐用英文来交流。

关于你的问题,我认为这个watermark的时区其实对你的数据计算过程是没有影响的,不管是不是存在时区偏移,
watermark跟事件时间他们两个是可以对齐的。如果你不需要把时间输出到sink,这个watermark的偏移你可以暂时先不用关注。

steven chen  于2020年5月30日周六 上午1:28写道:

> 1.添加了时区函数,但是水印时间还是提前了8个小时,是否只能从数据源头上去减去8个小时,在输入到flink
>  2.处理事件窗口刚好晚8个小时,这个是否添加时区函数即可恢复,如果添加请问是SQL 如何添加
>
> steven chen
> 邮箱:stevenche...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> 在2020年05月29日 17:22,steven chen  写道:
>
>
>
> 谢谢大佬!我感觉我思路一下清晰了!感谢提供思路和方案
>
>
>
>
> 在 2020-05-29 17:12:45,"Benchao Li"  写道:
>
> 1,你现在用的是TUMBLE(滚动窗口),不是滑动窗口。 窗口的划分就是按照数据的时间来计算,
>   比如你是5min的窗口,就应该是整数的窗口,比如[1:00, 1:05), [1:05, 1:10)这样子。
> 2, 如果你观察到的是30分钟后才有数据输出,这个大概率是跟你的watermark有关系,比如有些source subtask
>   的watermark要明显小于其他的subtask等情况。
> 3, 你这个情况如果不是必须要求事件时间,直接用处理时间窗口应该是比较符合你的预期的。
>
> steven chen  于2020年5月29日周五 下午3:46写道:
>
>>
>>
>> 谢谢!
>> 1.sql 这个滑动窗口的触发器的时间怎么设置?
>> 2.现在有数据写入,但是并不是5分钟后把窗口结果集sink 到mysql
>> 好像每开一个统计窗,都是30分钟后才能写入到mysql,或者下一次一定时间范围数据输入,才能写入, 这个又是什么原因?
>> 3.如果我现在只要有数据进来,开窗统计后,5分钟窗口结束触发将结果集insert到mysql
>>
>>
>>
>>
>>
>> 在 2020-05-29 15:36:08,"Benchao Li"  写道:
>>
>> 这应该是时区问题吧,TO_TIMESTAMP这个函数是有时区概念的。可以看下社区文档关于这个函数的介绍[1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html
>>
>> steven chen  于2020年5月29日周五 下午3:19写道:
>>
>>> 有个疑问就是 我从webui中查看 watermark  为今晚上22点58分?这个是为什么
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-05-29 15:02:21,"Benchao Li"  写道:
>>>
>>> Hi,
>>>
>>> Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。
>>>
>>>
>>> steven chen  于2020年5月29日周五 下午2:34写道:
>>>
 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
 CREATE TABLE user_behavior (

 itemCode VARCHAR,

 ts BIGINT COMMENT '时间戳',

 t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'-MM-dd HH:mm:ss')),

 proctime as PROCTIME(),

 WATERMARK FOR t as t - INTERVAL '5' SECOND

 ) WITH (

 'connector.type' = 'kafka',

 'connector.version' = '0.11',

 'connector.topic' = 'scan-flink-topic',

 'connector.properties.group.id' ='qrcode_pv_five_min',

 'connector.startup-mode' = 'latest-offset',

 'connector.properties.zookeeper.connect' = 'localhost:2181',

 'connector.properties.bootstrap.servers' = 'localhost:9092',

 'update-mode' = 'append',

 'format.type' = 'json',

 'format.derive-schema' = 'true'
 );
 CREATE TABLE pv_five_min (
 item_code VARCHAR,
 dt VARCHAR,
 dd VARCHAR,
 pv BIGINT
 ) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
 'connector.table' = 'qrcode_pv_five_min',
 'connector.driver' = 'com.mysql.jdbc.Driver',
 'connector.username' = 'root',
 'connector.password' = 'root',
 'connector.write.flush.max-rows' = '1'
 );
 INSERT INTO pv_five_min
 SELECT
 itemCode As item_code,
 DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dt,
 DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dd,
 COUNT(*) AS pv
 FROM user_behavior
 GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;







>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>>
>>
>>
>>
>
>
> --
>
> Best,
> Benchao Li
>
>
>
>
>
>

-- 

Best,
Benchao Li


Re: Session Window使用event time延迟特别高

2020-05-29 文章 Congxian Qiu
Hi

如果 processing time window 没有问题,但是 event time window 有问题的话,那需要考虑 event-time
下的 watermark 生成逻辑是否符合预期,如果 watermark 没有超过 window 结束时间,则一直不会被触发。

Best,
Congxian


李佳宸  于2020年5月22日周五 下午3:27写道:

> 大家好,
>
> 我遇到一个问题一直想不明白原因,想请教大家
>
> 我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。
>
> 代码大致如下
> // Topn聚合
> DataStream itemList = resultDataStream
> .assignTimestampsAndWatermarks(
> new
>
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(100))
> {
> @Override
>  public long extractTimestamp(PredictResult
> predictResult) {
>  return predictResult.getDate_timestamp();
>}
> }
> )
> .keyBy("userId")
>
> .window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
> .process(new TopNService(11));
> itemList.print("IRS_RESULT: ");
>
>
> 作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用
> ProcessTimeSessionWindow后,延迟降低为一秒以内。
> 使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。
> 我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~
>
>
> 谢谢
>
> // top n方法
>
> public static class TopNService extends
> ProcessWindowFunction {
>
> private final int topSize;
>
> public TopNService(int topSize) {
> this.topSize = topSize;
> }
> @Override
> public void process(Tuple tuple, Context context,
> Iterable iterable, Collector collector) throws
> Exception {
> List allItems = new ArrayList<>();
> for (PredictResult predictResult:iterable){
> allItems.add(predictResult);
> }
> allItems.sort(new Comparator() {
> @Override
> public int compare(PredictResult o1, PredictResult o2) {
> return o2.probability.compareTo(o1.probability);
> }
> });
> int userId = allItems.get(0).userId ;
> String logonType=allItems.get(0).getLogonType();
> StringBuilder result = new StringBuilder();
> for (int i=0;i PredictResult currentItem = allItems.get(i);
> result.append(currentItem.serviceId).append(",");
> }
> LocalDate localDate = LocalDate.now();
> LocalTime localTime = LocalTime.now();
> //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 start
> JSONObject resultJson = new JSONObject();
> resultJson.put("user_id", userId);
> resultJson.put("logon_type", logonType);
> resultJson.put("date", localDate + " " + localTime);
> JSONArray jsonArray = new JSONArray();
> jsonArray.add(resultJson);
> resultJson.put("service_id", result.toString());
> //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 end
> collector.collect(jsonArray.toString());
> }
> }
>


Re: 回复:全局state

2020-05-29 文章 Congxian Qiu
hi
Flink 暂时没有全局 State 的概念,所有的 state 均指单并发下的 state。不过你可以用 broadcast state[1]
来模拟全局 state
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/broadcast_state.html
Best,
Congxian


a773807...@gmail.com  于2020年5月27日周三 下午3:47写道:

> 不会,在第二次keyby的时候,根据id-name 做key, 然后做reduce,
> 把重复的数据中的value,根据time去做判断,取time最小值的value,就可以去重了
>
>
>
> a773807...@gmail.com
>
> 发件人: star
> 发送时间: 2020-05-27 15:45
> 收件人: user-zh
> 主题: 回复:回复:全局state
> 感谢您的建议,但是这样数据会重复,翻倍。目前看好像只能依赖外部存储了
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: a773807...@gmail.com  发送时间: 2020年5月27日 11:32
> 收件人: user-zh  主题: 回复:回复:全局state
>
>
>
> Hi
>  我想到一个方案,你可以参考一下
> 源数据:
>  id:1,name:A,value:A1, time: t1
>  id:2,name:A,value:A2, time: t2
>  id:1,name:B,value:A3, time: t3
> 经过flatMap后:
>  id-name: 1-A, key: 1, value: A1, time: t1
>  id-name: 1-A, key: A, value: A1, time:t1
>  id-name: 2-B, key: 2, value: A2, time: t2
>  id-name: 2-B, key: B, value: A2, time: t2
>  id-name: 1-B, key: 1, value: A3, time:t3
>  id-name: 1-B, key: B, value: A3, time:t3
> 然后根据key做keyBy,里面keyBy里面的state,根据时间获取对应的value
> 然后再做一次keyby,不过是根据id-name 去合并回来,就可以实现你的问题了
>
>
>
> a773807...@gmail.com
> 
> 发件人: star
> 发送时间: 2020-05-27 00:05
> 收件人: user-zh
> 主题: 回复:全局state
> 感谢您的回复,通过keyby细节实现有点难.
> id:1,name:A,value:A1
> id:2,name:A,value:A2
> id:1,name:B,value:A3
> 
> 
> 以上三条记录,第一条最先到达,后面两条到达后发现id或者name和第一条的id或者name一致value就取第一条的值。
> 输出:
> id:1,name:A,value:A1
> id:2,name:A,value:A1
> id:1,name:B,value:A1
> 
> 
> 原来的想法是存一个map,map的key是id或者name,value是对应的value值;例如上面例子对应的mapstate对应的k,v:
> 
> 
> key:1,value:A1
> key:A,value:A1
> key:2,value:A1
> key:B,value:A1
> 
> 
> 
> 发自我的iPhone
> 
> 
> -- 原始邮件 --
> 发件人: tison  发送时间: 2020年5月26日 19:01
> 收件人: user-zh  主题: 回复:全局state
> 
> 
> 
> 任意并行度全局状态从物理上就是不可行的,你可以了解一下分布式计算系统怎么部署物理作业的。“全局状态”要么依赖外部存储要么依赖实现(部署)细节。
> 
> 你这个需求能不能自定义 KeyBy 细节(KeySelector)来实现?相关文档见
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions
> 
> 
> ;
> Best,
> tison.
> 
> 
> star <3149768...@qq.comgt; 于2020年5月26日周二 下午6:42写道:
> 
> gt;
> 请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
> gt; state,并且并行度设置为1,来实现全局state
> gt;
> gt;
> gt; 谢谢
> gt;
> gt; 发自我的iPhone
>


Re: FlinkKafkaConsumer消费kafka中avro格式的消息,直接用AvroDeserializationSchema总是运行失败,自己实现了一个AbstracteserializationSchema运行成功了。

2020-05-29 文章 hyangvv
你好,我没有用confluent schema 
registry的avro格式,就是调用avro生成类中的toByteBufer实现了kafka的Serilizer。



> 在 2020年5月29日,下午12:31,Leonard Xu  写道:
> 
> Hi, 
>> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,
> 你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat 
> registry的avro格式吧
> confluent schemat registry 
> 在处理avro数据时会多写一个MAGIC_BYTE,一般avro是没有的,消费时用ConfluentRegistryAvroDeserializationSchema
>  试试。
> 
> 
> Best,
> Leonard Xu
> [1] 
> https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17036670
>  
> 
> 
> 
>> 在 2020年5月29日,01:14,hyangvv  写道:
>> 
>> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下:
>> import org.apache.kafka.common.serialization.Serializer;
>> 
>> import java.io.IOException;
>> 
>> public class UserViewSerializer implements Serializer {
>>   @Override
>>   public byte[] serialize(String topic, UserView data) {
>>   byte[] array = null;
>>   try {
>>   array = data.toByteBuffer().array();
>>   } catch (IOException e) {
>>   e.printStackTrace();
>>   }
>>   return array;
>>   }
>> }
>> 构造kafka的生产者,将UserView实例写入kafka队列,代码如下:
>> KafkaProducer producer = new KafkaProducer<>(props, new 
>> StringSerializer(), new UserViewSerializer());
>> 在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下:
>> FlinkKafkaConsumer myConsumer = new 
>> FlinkKafkaConsumer<>("UserView", 
>> AvroDeserializationSchema.forGeneric(SCHEMA), properties);
>> 导致运行失败的异常信息如下:
>> 
>> Caused by: java.io.EOFException
>>   at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw 
>> (BinaryDecoder.java:827)
>>   at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349)
>>   at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263)
>>   at org.apache.avro.io.ResolvingDecoder.readString 
>> (ResolvingDecoder.java:201)
>>   at org.apache.avro.generic.GenericDatumReader.readString 
>> (GenericDatumReader.java:422)
>>   at org.apache.avro.generic.GenericDatumReader.readString 
>> (GenericDatumReader.java:414)
>>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
>> (GenericDatumReader.java:181)
>>   at org.apache.avro.generic.GenericDatumReader.read 
>> (GenericDatumReader.java:153)
>>   at org.apache.avro.generic.GenericDatumReader.readField 
>> (GenericDatumReader.java:232)
>>   at org.apache.avro.generic.GenericDatumReader.readRecord 
>> (GenericDatumReader.java:222)
>>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
>> (GenericDatumReader.java:175)
>>   at org.apache.avro.generic.GenericDatumReader.read 
>> (GenericDatumReader.java:153)
>>   at org.apache.avro.generic.GenericDatumReader.read 
>> (GenericDatumReader.java:145)
>>   at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize 
>> (AvroDeserializationSchema.java:135)
>>   at 
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize
>>  (KafkaDeserializationSchemaWrapper.java:45)
>>   at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop
>>  (KafkaFetcher.java:140)
>>   at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run 
>> (FlinkKafkaConsumerBase.java:718)
>>   at org.apache.flink.streaming.api.operators.StreamSource.run 
>> (StreamSource.java:100)
>>   at org.apache.flink.streaming.api.operators.StreamSource.run 
>> (StreamSource.java:63)
>>   at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
>>  (SourceStreamTask.java:200)
>> 
>> 希望大神不吝赐教。
>> 
> 



Re: Kafka Consumer反序列化错问题

2020-05-29 文章 tison
另外关于类加载的一般性文档,可以看下这个

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison  于2020年5月29日周五 下午7:46写道:

> 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
>
> 参考这个文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
>
> Best,
> tison.
>
>
> Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:
>
>> 谢谢,请问需要怎么处理避免这个问题?
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:"zz zhang"> 发送时间:2020年5月29日(星期五) 下午5:16
>> 收件人:"user-zh"> jkill...@dingtalk.com;
>>
>> 主题:Re: Kafka Consumer反序列化错问题
>>
>>
>>
>> 应该是maven-shade那边配置问题,
>>
>> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
>> apache.flink.kafka.shaded.org
>> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>>
>> 夏帅 > 
>>  可以排除一下是否是jar包冲突
>> 
>> 
>>  --
>>  发件人:Even <452232...@qq.com
>>  发送时间:2020年5月29日(星期五) 16:17
>>  收件人:user-zh >  主 题:Kafka Consumer反序列化错问题
>> 
>>  Hi!
>>  请教一个Kafka Consumer反序列问题:
>>  一个kafkanbsp;consumernbsp;job 提交到Flink session
>> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>>  其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
>> env.addSource(new FlinkKafkaConsumer[String](topic, new
>> SimpleStringSchema(), properties))
>>  2020-05-27nbsp;17:05:22
>>  org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>  at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>  at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>  at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>  at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>  at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>  at java.lang.Thread.run(Thread.java:748)
>>  Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>> is not an instance of org.apache.kafka.common.serialization.Deserializer
>>  at
>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  ... 15 more
>>
>>
>>
>> --
>> Best,
>> zz zhang
>
>


Re: Kafka Consumer反序列化错问题

2020-05-29 文章 tison
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。

参考这个文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath

Best,
tison.


Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:

> 谢谢,请问需要怎么处理避免这个问题?
>
>
>
>
> --原始邮件--
> 发件人:"zz zhang" 发送时间:2020年5月29日(星期五) 下午5:16
> 收件人:"user-zh" jkill...@dingtalk.com;
>
> 主题:Re: Kafka Consumer反序列化错问题
>
>
>
> 应该是maven-shade那边配置问题,
>
> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
> apache.flink.kafka.shaded.org
> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>
> 夏帅  
>  可以排除一下是否是jar包冲突
> 
> 
>  --
>  发件人:Even <452232...@qq.com
>  发送时间:2020年5月29日(星期五) 16:17
>  收件人:user-zh   主 题:Kafka Consumer反序列化错问题
> 
>  Hi!
>  请教一个Kafka Consumer反序列问题:
>  一个kafkanbsp;consumernbsp;job 提交到Flink session
> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>  其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
> env.addSource(new FlinkKafkaConsumer[String](topic, new
> SimpleStringSchema(), properties))
>  2020-05-27nbsp;17:05:22
>  org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>  at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>  at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>  at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.common.KafkaException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> is not an instance of org.apache.kafka.common.serialization.Deserializer
>  at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  ... 15 more
>
>
>
> --
> Best,
> zz zhang


?????? Kafka Consumer??????????????

2020-05-29 文章 Even





----
??:"zz zhang"

?????? flink-1.10.0 hive-1.2.1 No operators defined in streaming topology

2020-05-29 文章 kcz
?? ?? 




----
??:"Benchao Li"

Re: flink-1.10.0 hive-1.2.1 No operators defined in streaming topology

2020-05-29 文章 Benchao Li
我不是特别确定,但是看起来跟这两行代码有关系:
 List 于2020年5月29日周五 下午3:48写道:

> 代码如下:这是error提示:Exception in thread "main" java.lang.IllegalStateException:
> No operators defined in streaming topology. Cannot execute其实insert成功了,去hive
> cli确认过,select也查了数据,为什么还是提示我没有算子被定义呢?我需要batch table吗 EnvironmentSettings
> settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>  TableEnvironment tableEnv = TableEnvironment.create(settings);
>
>  String name= "myhive";
>  String defaultDatabase = "situation";
>  String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
>  String version = "1.2.1";
>  String CATALOG_NAME = "myhive";
>
>  HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>  hiveCatalog.open();
>  tableEnv.registerCatalog(CATALOG_NAME, hiveCatalog);
>
>  Optional  ObjectPath myTablePath = new ObjectPath("situation", "flink_test");
>  System.out.println(myHive.get().getTable(myTablePath).getSchema());
>
>
>  //集成Hive内置函数
> tableEnv.loadModule("hiveModule",new HiveModule(version));
>
>  tableEnv.useCatalog(CATALOG_NAME);
>
>  tableEnv.sqlUpdate("insert into situation.flink_test values (3,'kcz3')");
>  Table table = tableEnv.sqlQuery(" select * from situation.flink_test");
>  List  System.out.println(rowList);
>
>
>  tableEnv.execute("test");



-- 

Best,
Benchao Li


Re: Kafka Consumer反序列化错问题

2020-05-29 文章 zz zhang
应该是maven-shade那边配置问题,
原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常

夏帅  于2020年5月29日周五 下午4:33写道:
>
> 可以排除一下是否是jar包冲突
>
>
> --
> 发件人:Even <452232...@qq.com>
> 发送时间:2020年5月29日(星期五) 16:17
> 收件人:user-zh 
> 主 题:Kafka Consumer反序列化错问题
>
> Hi!
> 请教一个Kafka Consumer反序列问题:
> 一个kafkaconsumerjob 提交到Flink session cluster时运行稳定,但是独立提交到到Flink 
> per-job cluster 就报kafka反序列化错,报错信息如下:
> 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = 
> env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
> properties))
> 2020-05-2717:05:22
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>  at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>  is not an instance of org.apache.kafka.common.serialization.Deserializer
>  at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  ... 15 more



-- 
Best,
zz zhang


??????flink sql??hbase connector????

2020-05-29 文章 111
Hi??
??1.2.0-cdh??1.4.3??hbase connector
??jar:

flink-hbase_2.11-1.10.1.jar 

hbase-client-1.4.3.jar

hbase-common-1.4.3.jar

hbase-protocol-1.4.3.jar

Best,
Xinghalo

Re: flink sql的hbase connector问题

2020-05-29 文章 Leonard Xu
Hi,
(1)是的,
(2) 可以,自己实现下就行

祝好
Leonard Xu

> 在 2020年5月29日,16:44,op <520075...@qq.com> 写道:
> 
> 大家好我有两个问题:
> 
> 
>   1.现有的 hbase connector 只支持 hbase 1.4.3版本吗?
>   2.可以自定义1.2.0版本的connector吗?
> 
> 
> 谢谢!



flink sql??hbase connector????

2020-05-29 文章 op
??:


  1.?? hbase connector ?? hbase 1.4.3
  2.??1.2.0??connector


??

回复:Kafka Consumer反序列化错问题

2020-05-29 文章 夏帅
可以排除一下是否是jar包冲突


--
发件人:Even <452232...@qq.com>
发送时间:2020年5月29日(星期五) 16:17
收件人:user-zh 
主 题:Kafka Consumer反序列化错问题

Hi!
请教一个Kafka Consumer反序列问题:
一个kafkaconsumerjob 提交到Flink session cluster时运行稳定,但是独立提交到到Flink 
per-job cluster 就报kafka反序列化错,报错信息如下:
其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = 
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
properties))
2020-05-2717:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.

Kafka Consumer??????????????

2020-05-29 文章 Even
Hi??
Kafka Consumer
kafkaconsumerjob ??Flink session 
clusterFlink per-job cluster 
kafka??
flink??1.10,kafka??kafka_2.12-2.1.0consumer??val data = 
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
properties))
2020-05-2717:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.

flink-1.10.0 hive-1.2.1 No operators defined in streaming topology

2020-05-29 文章 ??????????????
??error??Exception in thread "main" 
java.lang.IllegalStateException: No operators defined in streaming topology. 
Cannot executeinsert??hive 
cliselectbatch 
table?? EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
 TableEnvironment tableEnv = TableEnvironment.create(settings);

 String name= "myhive";
 String defaultDatabase = "situation";
 String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
 String version = "1.2.1";
 String CATALOG_NAME = "myhive";

 HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
 hiveCatalog.open();
 tableEnv.registerCatalog(CATALOG_NAME, hiveCatalog);

 Optional

??????flink ????hadoop????????

2020-05-29 文章 ??????????????
??OK??




----
??:""<13162790...@163.com;
:2020??5??29??(??) 3:20
??:"user-zh"

?????? flink ????hadoop????????

2020-05-29 文章 ??????????????
thanks very much ??
resource hdfs-site.xml




----
??:"wangweigu...@stevegame.cn"

回复:flink sql的json解析udf

2020-05-29 文章 star
谢谢



发自我的iPhone


-- 原始邮件 --
发件人: Benchao Li https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550

star <3149768...@qq.com 于2020年5月29日周五 下午2:34写道:

 请问,flink sql1.10有解析json的udf吗?没有找到

 发自我的iPhone



-- 

Best,
Benchao Li

Re:flink 访问hadoop集群问题

2020-05-29 文章 程龙






下面的代码是你本地运行的是吗 如果是本地需要最简单的方式 就是把hdfs-site.xml 和core-site.xml 配置文件放到资源目录下











在 2020-05-29 15:06:21,"了不起的盖茨比" <573693...@qq.com> 写道:
>请教大家一个问题 ,
>hadoop服务TestHACluster,可是我用api访问时候,填写了path 
>hdfs://TestHACluster/user/flink/test
>就会去访问TestHACluster:8020, 但是我是没有那个端口的,针对这种情况怎么处理下。
>因为这个问题,我后面去操作hive时候也是会提示问题,无法连接TestHACluster:8020
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>DataStream
>StreamingFileSinknew Path("hdfs://TestHACluster/user/flink/test"),
>new SimpleStringEncoder.withBucketAssigner(new DateTimeBucketAssigner<())
>.build();
>input.addSink(sink);
>env.execute();


????: flink ????hadoop????????

2020-05-29 文章 wangweigu...@stevegame.cn
   cdh??hdfs8020?? 
hdfs://TestHACluster/user/flink/test   
hdfs://TestHACluster:8020/user/flink/test
   
flinkTestHAClusterNamespace??hdfsHA??hive-site.xml??hdfs-site.xml


 
 ??
?? 2020-05-29 15:06
 user-zh
?? flink hadoop
 ??
hadoopTestHACluster??apipath 
hdfs://TestHACluster/user/flink/test
??TestHACluster:8020?? 
??hiveTestHACluster:8020
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream

flink ????hadoop????????

2020-05-29 文章 ??????????????
 ??
hadoopTestHACluster??apipath 
hdfs://TestHACluster/user/flink/test
??TestHACluster:8020?? 
??hiveTestHACluster:8020
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream

Re: flink sql的json解析udf

2020-05-29 文章 Benchao Li
Hi,

这块工作还处于DOING状态,可以参考FLIP-90[1]

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550

star <3149768...@qq.com> 于2020年5月29日周五 下午2:34写道:

> 请问,flink sql1.10有解析json的udf吗?没有找到
>
> 发自我的iPhone



-- 

Best,
Benchao Li


Re: Re: flink-sql watermark问题

2020-05-29 文章 wangweigu...@stevegame.cn

你的代码:w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss')
这里里面的 FROM_UNIXTIME 函数就是接受BIGINT参数,返回 VARCHAR类型的日期值,默认日期格式:-MM-dd 
HH:mm:ss
然后通过TO_TIMESTAMP函数转换成TIMESTAMP类型,是timestamp(3)类型,这个只能用blink planner

参考阿里云网址:https://help.aliyun.com/knowledge_list_page/62717/1.html?spm=a2c4g.11186631.2.4.41933356drYMGX
 
里面有flink 日期函数接受!



 
发件人: Benchao Li
发送时间: 2020-05-29 10:35
收件人: user-zh
主题: Re: Re: flink-sql watermark问题
Flink支持把Timestamp(3)这种类型声明声明为事件时间列,并且为它生成watermark。
你上面提到的"-MM-dd'T'HH:mm:ss.SSS'Z'",并不是一种数据类型,它只是Timestamp的一种string表达形式,这个主要是在json
format里面把一个字符串解析为timestamp类型的时候需要的一种格式。
 
所以如果你有其他类型的字段,比如varchar、long、int等等,都可以通过内置函数或者udf将其转成timestamp(3)的类型,再在此基础上做watermark生成。
 
guaishushu1...@163.com  于2020年5月29日周五 上午10:25写道:
 
> 而且 flink不是只支持这种"-MM-dd'T'HH:mm:ss.SSS'Z'" 类型解析为watermark吗,就对这样有点疑惑
>
>
>
> guaishushu1...@163.com
>
> 发件人: guaishushu1...@163.com
> 发送时间: 2020-05-29 10:20
> 收件人: Benchao Li
> 抄送: user-zh
> 主题: Re: Re: flink-sql watermark问题
>
> 就是我是long类型的时间戳,但是用TO_TIMESTAMP转换成'-MM-dd HH:mm:ss' 之后依然可以生成watermark。
>
>
> guaishushu1...@163.com
>
> 发件人: Benchao Li
> 发送时间: 2020-05-28 17:00
> 收件人: user-zh
> 主题: Re: flink-sql watermark问题
> Hi,
>
> 没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。
> 之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-16938
>
> guaishushu1...@163.com  于2020年5月28日周四 下午4:22写道:
>
> > flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
> > 但是long这样转换后也可以生成watermark很奇怪?
> > CREATE TABLE user_log (
> > response_size int,
> > rowtime BIGINT,
> > w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
> > WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
> > )
> >
> >
> >
> > guaishushu1...@163.com
> >
>
>
> --
>
> Best,
> Benchao Li
>
 
 
-- 
 
Best,
Benchao Li


回复: Re: flink1.10 on yarn 问题

2020-05-29 文章 wangweigu...@stevegame.cn

这个报错:>>> Caused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed
>>> by NoRestartBackoffTimeStrategy

应该是没有读取到flink conf下面的flink-conf.yaml配置文件,里面有个task失败重启配置参数!



 
发件人: air23
发送时间: 2020-05-29 14:34
收件人: user-zh
主题: Re:Re: flink1.10 on yarn 问题
代码就是flink自带的例子。
 
public class WordCountStreamingByJava {
public static void main(String[] args) throws Exception {
 
// 创建执行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置socket数据源
DataStreamSource source = env.socketTextStream("zongteng75", 9001, 
"\n");
 
// 转化处理数据
DataStream dataStream = source.flatMap(new 
FlatMapFunction() {
@Override
public void flatMap(String line, Collector collector) throws 
Exception {
 
System.out.println(line);
for (String word : line.split(" ")) {
collector.collect(new WordWithCount(word, 1));
}
}
}).keyBy("word")//以key分组统计
.timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
.sum("count");//计算时间窗口内的词语个数
 
// 输出数据到目的端
dataStream.print();
 
// 执行任务操作
env.execute("Flink Streaming Word Count By Java");
 
}
 
 
 
 
我现在加了flink环境变量 这个例子 可以过了。就很奇怪 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
在 2020-05-29 14:22:39,"tison"  写道:
>然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
>
>Best,
>tison.
>
>
>tison  于2020年5月29日周五 下午2:21写道:
>
>> 这个问题好诡异啊,一般来说编译会在 env.execute
>> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>>
>> Best,
>> tison.
>>
>>
>> air23  于2020年5月29日周五 下午1:38写道:
>>
>>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
>>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
>>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
>>> 求解答
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error:
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>> (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>>
>>> at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>>
>>> Caused by: java.util.concurrent.ExecutionException:
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>> (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>>
>>> at
>>> tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>
>>> ... 11 more
>>>
>>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>>> Job failed (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>
>>> at
>>> 

Re: Re: flink1.10 on yarn 问题

2020-05-29 文章 tison
你运行的命令是啥?然后在哪个目录下运行的,和 flink 下载下来解压的目录是什么相对关系?

Best,
tison.


air23  于2020年5月29日周五 下午2:35写道:

> 代码就是flink自带的例子。
>
> public class WordCountStreamingByJava {
> public static void main(String[] args) throws Exception {
>
> // 创建执行环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置socket数据源
> DataStreamSource source = env.socketTextStream("zongteng75", 9001,
> "\n");
>
> // 转化处理数据
> DataStream dataStream = source.flatMap(new
> FlatMapFunction() {
> @Override
> public void flatMap(String line, Collector collector)
> throws Exception {
>
> System.out.println(line);
> for (String word : line.split(" ")) {
> collector.collect(new WordWithCount(word, 1));
> }
> }
> }).keyBy("word")//以key分组统计
> .timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
> .sum("count");//计算时间窗口内的词语个数
>
> // 输出数据到目的端
> dataStream.print();
>
> // 执行任务操作
> env.execute("Flink Streaming Word Count By Java");
>
> }
>
>
>
>
> 我现在加了flink环境变量 这个例子 可以过了。就很奇怪
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-29 14:22:39,"tison"  写道:
> >然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
> >
> >Best,
> >tison.
> >
> >
> >tison  于2020年5月29日周五 下午2:21写道:
> >
> >> 这个问题好诡异啊,一般来说编译会在 env.execute
> >> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> air23  于2020年5月29日周五 下午1:38写道:
> >>
> >>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
> >>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
> >>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
> >>> 求解答
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The main
> >>> method caused an error:
> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>> (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>>
> >>> at
> >>>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> >>>
> >>> at
> >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >>>
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>>
> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>
> >>> at
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>
> >>> at
> >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >>>
> >>> Caused by: java.util.concurrent.ExecutionException:
> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>> (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >>>
> >>> at
> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> >>>
> >>> at
> >>>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
> >>>
> >>> at
> >>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> >>>
> >>> at
> >>> tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)
> >>>
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>
> >>> at
> >>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>
> >>> at
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>
> >>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> >>>
> >>> ... 11 more
> >>>
> >>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> >>> Job failed (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> >>>
> >>> at
> >>>
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> >>>
> >>> at
> >>>
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> >>>
> >>>  

Re:Re: flink1.10 on yarn 问题

2020-05-29 文章 air23
代码就是flink自带的例子。

public class WordCountStreamingByJava {
public static void main(String[] args) throws Exception {

// 创建执行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置socket数据源
DataStreamSource source = env.socketTextStream("zongteng75", 9001, 
"\n");

// 转化处理数据
DataStream dataStream = source.flatMap(new 
FlatMapFunction() {
@Override
public void flatMap(String line, Collector collector) throws 
Exception {

System.out.println(line);
for (String word : line.split(" ")) {
collector.collect(new WordWithCount(word, 1));
}
}
}).keyBy("word")//以key分组统计
.timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
.sum("count");//计算时间窗口内的词语个数

// 输出数据到目的端
dataStream.print();

// 执行任务操作
env.execute("Flink Streaming Word Count By Java");

}




我现在加了flink环境变量 这个例子 可以过了。就很奇怪 

















在 2020-05-29 14:22:39,"tison"  写道:
>然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
>
>Best,
>tison.
>
>
>tison  于2020年5月29日周五 下午2:21写道:
>
>> 这个问题好诡异啊,一般来说编译会在 env.execute
>> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>>
>> Best,
>> tison.
>>
>>
>> air23  于2020年5月29日周五 下午1:38写道:
>>
>>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
>>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
>>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
>>> 求解答
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error:
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>> (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>>
>>> at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>>
>>> Caused by: java.util.concurrent.ExecutionException:
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>> (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>>
>>> at
>>> tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>
>>> ... 11 more
>>>
>>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>>> Job failed (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>>
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>
>>>   

flink sql的json解析udf

2020-05-29 文章 star
请问,flink sql1.10有解析json的udf吗?没有找到

发自我的iPhone

Re:Re: flink1.10 on yarn 问题

2020-05-29 文章 air23



你好 是cluster的 本地代码没有报错的 报错的消息贴下面了
flink1.7 时正常的。
后来我加上了flink的环境变量
#flink 
export FLINK_HOME=/opt/module/flink-1.10.1
export PATH=${FLINK_HOME}/bin:$PATH
这个报错的例子 就正常跑了


但是换了另外一个任务 在1.7 和本地都是可以的。报错如下

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not deploy Yarn job cluster.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:398)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at com.zongteng.ztstream.etl.MongoToKafka.main(MongoToKafka.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1590715263014_0033 failed 1 
times due to AM Container for appattempt_1590715263014_0033_01 exited with  
exitCode: 2
For more detailed output, check application tracking 
page:http://zongteng72:8088/proxy/application_1590715263014_0033/Then, click on 
links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1590715263014_0033_01_01
Exit code: 2
Stack trace: ExitCodeException exitCode=2: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
at org.apache.hadoop.util.Shell.run(Shell.java:507)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)




Container exited with a non-zero exit code 2
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1590715263014_0033
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:999)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:488)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:391)
  

Re: flink1.10 on yarn 问题

2020-05-29 文章 tison
然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)

Best,
tison.


tison  于2020年5月29日周五 下午2:21写道:

> 这个问题好诡异啊,一般来说编译会在 env.execute
> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>
> Best,
> tison.
>
>
> air23  于2020年5月29日周五 下午1:38写道:
>
>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
>> 求解答
>>
>>
>>
>>
>>
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>
>> at
>> tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>
>> ... 11 more
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> Job failed (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>>
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>>
>> at
>> 

pyflink Table Api连接 外部系统问题

2020-05-29 文章 刘亚坤
目前在学习使用pyflink的Table api,请教一个问题:
1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka 
topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作?
2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。


新手入门,请多指教,感谢。