Re:回复:Flink 1.11 SQL可以支持kafka动态分区发现么?

2021-01-19 文章 sunfulin

我看下这个源码,貌似是有这个参数。还不确定SQL ddl里配置会不会生效,回头验证下。




--
发自我的网易邮箱手机智能版



- Original Message -
From: "Shuai Xia" 
To: user-zh , sunfulin0321 
Sent: Wed, 20 Jan 2021 14:42:36 +0800
Subject: 回复:Flink 1.11 SQL可以支持kafka动态分区发现么?

Hi,看下FlinkKafkaConsumerBase内有没有使用,有的话就是支持的


--
发件人:sunfulin 
发送时间:2021年1月20日(星期三) 14:40
收件人:user-zh 
主 题:Flink 1.11 SQL可以支持kafka动态分区发现么?


hi,
各位大神,请教下,1.11的sql作业,如何能实现动态分区发现呐?我在1.12的文档里发现有个参数可以设置,但是1.11的版本里貌似没有。想确认下能否支持?




--
发自我的网易邮箱手机智能版

Flink 1.11 SQL可以支持kafka动态分区发现么?

2021-01-19 文章 sunfulin

hi,
各位大神,请教下,1.11的sql作业,如何能实现动态分区发现呐?我在1.12的文档里发现有个参数可以设置,但是1.11的版本里貌似没有。想确认下能否支持?




--
发自我的网易邮箱手机智能版

Flink 1.10在udf中传入array类型的解析异常

2021-01-05 文章 sunfulin
hi,
我遇到一个问题,消费的source里有字段定义为array>这种类型,然后想通过一个udf将它处理成一个字符串。udf的入参定义如下:


public String eval(Row[] item, String char1, String char2);


但是在函数处理时,debug发现拿到的item里的row信息始终为null。也通过DataTypeHint注解给出了item的实际类型。这是不是1.10的bug呀?如果有相关的issue单的话,烦请有知道的发我下哈。
我在1.11里验证同样的逻辑,是没这个问题的。

Re:Re: flink cep超时事件的问题

2020-10-29 文章 sunfulin



hi,
session window能处理这种超时事件么?不知道有没有例子可以参考参考哈。














在 2020-10-30 11:12:55,"naisili Yuan"  写道:
> 不知道理解错没有, 感觉你这个场景使用session windows能解决
>
>sunfulin  于2020年10月30日周五 上午11:01写道:
>
>> hi,community,
>> 我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink
>> cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
>> 目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
>> 感谢。


flink cep超时事件的问题

2020-10-29 文章 sunfulin
hi,community,
我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink 
cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
感谢。

不使用minibatch时状态丢失的异常

2020-08-15 文章 sunfulin
hi,
我执行如下的sql场景:select userId, first_value(xxx) from source group by userId, 
date_format(eventtime, '-MM-dd')
在不使用minibatch时,出现状态丢失的情况,同一个userId同一天输出了多条记录。这种可能是bug么?使用的flink 版本为1.10.1

flink state ttl状态清理和重新计算的疑问

2020-08-14 文章 sunfulin
hi,community,
想确认下idlestateretention的配置及生效机制,我有一个作业,设置了TTL为(10小时,10小时+5分钟)。假设我的作业是今天12:00启动,作业逻辑是统计当日10点后每个userId第一次登陆的时间:select
  userId, first_value(xxx) from source group by userId, date_format(eventtime, 
'-MM-dd')。那么我的作业的状态清理时机是从启动时间开始10小时之后么?还是会按照状态的数据更新的时间+10小时作为清理时间?
我使用flink 1.10.1版本,初步观察到的现象是,启动时间开始大概10小时后,状态开始清理。这个感觉不符合预期?求大佬帮忙确认下。

Re:Re: flink sql作业state size一直增加

2020-08-14 文章 sunfulin
hi, benchao,
感谢回复,那我是不是可以理解为:去掉minibatch,就可以状态过期清理了哈?

















在 2020-08-14 14:09:33,"Benchao Li"  写道:
>Hi,
>现在group agg + mini batch 还没有支持状态过期清理,已经有工作[1] 在解决这个问题了。
>
>[1] https://issues.apache.org/jira/browse/FLINK-17096
>
>sunfulin  于2020年8月14日周五 下午2:06写道:
>
>> hi,我的一个flink sql作业,在启用了idlestateretentiontime设置后,观察到web ui上的state
>> size还是一直在增大,超过maximum retention time之后state大小也没有减小的情况,请问这个可能是啥原因哈?
>>
>>
>> 使用的flink 版本:flink 1.10.1,启用的state
>> ttl配置:tableEnv.getConfig.setIdleStateRetentionTime(Time.minutes(5),
>> Time.minutes(10));
>> 我的作业逻辑是:统计每个userId每天第一次出现的记录,类似:select userId, first_value(xxx) from
>> source group by userId, date_format(eventtime, '-MM-dd');
>
>
>
>-- 
>
>Best,
>Benchao Li


flink sql作业state size一直增加

2020-08-14 文章 sunfulin
hi,我的一个flink sql作业,在启用了idlestateretentiontime设置后,观察到web ui上的state 
size还是一直在增大,超过maximum retention time之后state大小也没有减小的情况,请问这个可能是啥原因哈?


使用的flink 版本:flink 1.10.1,启用的state 
ttl配置:tableEnv.getConfig.setIdleStateRetentionTime(Time.minutes(5), 
Time.minutes(10));
我的作业逻辑是:统计每个userId每天第一次出现的记录,类似:select userId, first_value(xxx) from source 
group by userId, date_format(eventtime, '-MM-dd');

Re:Re: Re:Re: flink 1.11任务提交的问题

2020-07-17 文章 sunfulin



hi,
再问下,这个方案还是会提交两个job吧?














在 2020-07-17 14:36:19,"godfrey he"  写道:
>做不到,1.11里把 StreamExecutionEnvironment.execute 和
>StreamTableEnvironment.execute 的逻辑已经切分干净了。
>有个改动比较小的方案可以参考:可以在原来的逻辑的基础上,把两种提交job的方式放到两个不同的类中,其他的逻辑放到另外一个类共性。
>
>sunfulin  于2020年7月17日周五 下午2:00写道:
>
>> hi,
>> 补充一下,1.10版本的代码使用sqlUpdate +
>> table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-17 13:55:21,"sunfulin"  写道:
>>
>>
>>
>>
>> hi,
>> 感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
>> to DataStream的语句不会生成拓扑。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-17 12:09:20,"godfrey he"  写道:
>> >hi sunfulin,
>> >目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
>> >即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
>> >虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
>> >
>> >Best,
>> >Godfrey
>> >
>> >Leonard Xu  于2020年7月17日周五 上午12:12写道:
>> >
>> >> Hi,
>> >>
>> >> 我理解目前好像做不到, cc: godfrey 大佬看看
>> >>
>> >> 祝好,
>> >> Leonard Xu
>> >>
>> >> > 在 2020年7月16日,23:08,sunfulin  写道:
>> >> >
>> >> > hi,
>> >> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
>> >> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
>> >> > 通过StreamExecutionEnvironment.execute提交,yarn
>> >> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>> >>
>> >>
>>
>>
>>
>>
>>
>>


Re:Re:Re: flink 1.11任务提交的问题

2020-07-17 文章 sunfulin
hi,
补充一下,1.10版本的代码使用sqlUpdate + 
table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
















在 2020-07-17 13:55:21,"sunfulin"  写道:




hi,
感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
 to DataStream的语句不会生成拓扑。











在 2020-07-17 12:09:20,"godfrey he"  写道:
>hi sunfulin,
>目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
>即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
>虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
>
>Best,
>Godfrey
>
>Leonard Xu  于2020年7月17日周五 上午12:12写道:
>
>> Hi,
>>
>> 我理解目前好像做不到, cc: godfrey 大佬看看
>>
>> 祝好,
>> Leonard Xu
>>
>> > 在 2020年7月16日,23:08,sunfulin  写道:
>> >
>> > hi,
>> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
>> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
>> > 通过StreamExecutionEnvironment.execute提交,yarn
>> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>>
>>





 

Re:Re: flink 1.11任务提交的问题

2020-07-16 文章 sunfulin



hi,
感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
 to DataStream的语句不会生成拓扑。











在 2020-07-17 12:09:20,"godfrey he"  写道:
>hi sunfulin,
>目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
>即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
>虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
>
>Best,
>Godfrey
>
>Leonard Xu  于2020年7月17日周五 上午12:12写道:
>
>> Hi,
>>
>> 我理解目前好像做不到, cc: godfrey 大佬看看
>>
>> 祝好,
>> Leonard Xu
>>
>> > 在 2020年7月16日,23:08,sunfulin  写道:
>> >
>> > hi,
>> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
>> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
>> > 通过StreamExecutionEnvironment.execute提交,yarn
>> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>>
>>


flink 1.11任务提交的问题

2020-07-16 文章 sunfulin
hi,
请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql 
dml提交(executeSQL执行),又通过DataStream.addSink来写出,
通过StreamExecutionEnvironment.execute提交,yarn 
per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。

Re:Re: flink 1.11 sql类型问题

2020-07-15 文章 sunfulin



hi, leonard
感谢回复。我在es的ddl with参数里加了这个,貌似还是报错。我再简单描述下我的场景:
我的es sink的ddl如下:
create table es_sink (
  a varchar,
  b varchar,
  c TIMESTAMP(9) WITH LOCAL TIME ZONE
) with (
  
)


我使用处理时间属性,将流里的proctime转成UTC格式的日期类型,作为c这个字段写入。现在能原生支持么?之前在1.10版本貌似是可以直接写的。但是到1.11写的不带时区了,导致不能兼容之前的格式。














在 2020-07-16 09:40:06,"Leonard Xu"  写道:
>Hello
>
>json解析UTC时间是支持的,你with参数里指定下json中timestamp的类型试下, json.timestamp-format.standard 
>= 'ISO-8601'
>
>Best
>Leonard Xu
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> 
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>
>
>> 在 2020年7月15日,23:19,sunfulin  写道:
>> 
>> hi,
>> 我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL 
>> TIME ZONE。
>> 在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
>> 2020-07-15T12:00:00.000Z 
>> 
>> 
>> 
>> java.lang.UnsupportedOperationException: Not support to parse type: 
>> TIMESTAMP(9) WITH LOCAL TIME ZONE
>> 
>> at 
>> org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-15 21:24:30,"sunfulin"  写道:
>>> hi,
>>> 我看1.11的java.sql.Timestamp 
>>> 对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?
>


Re:flink 1.11 sql类型问题

2020-07-15 文章 sunfulin
hi,
我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL TIME 
ZONE。
在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
2020-07-15T12:00:00.000Z 



java.lang.UnsupportedOperationException: Not support to parse type: 
TIMESTAMP(9) WITH LOCAL TIME ZONE

at 
org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)











在 2020-07-15 21:24:30,"sunfulin"  写道:
>hi,
>我看1.11的java.sql.Timestamp 
>对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?


flink 1.11 sql类型问题

2020-07-15 文章 sunfulin
hi,
我看1.11的java.sql.Timestamp 
对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?

Re:Re: flink 双流join报错,java.lang.AssertionError

2020-07-14 文章 sunfulin
hi, 
 @Danny Chan 
我在1.10版本中确实触发到了这个bug,切到1.11版本貌似就没这问题了。简单解释下问题:双流join的case,右边流join后的结果字段在获取时貌似乱序了。


















在 2020-07-13 10:42:12,"Jark Wu"  写道:
>cc @Danny Chan   也许 Danny 老师知道。
>
>On Thu, 9 Jul 2020 at 17:29, sunfulin  wrote:
>
>>
>> hi,
>> 我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。
>>
>>
>>
>>
>>
>>
>> 在 2020-07-09 16:53:34,"sunfulin"  写道:
>> >hi,
>> >我使用flink 1.10.1 
>> >blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select
>> > 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?
>> >
>> >
>> >select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, 
>> >A.fund_name as product_name, cast(B.balance as double) as balance
>> >from (
>> >select toLong(behaviorTime, true) as recvTime, user_id,
>> >cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
>> >regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
>> >regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime 
>> >from kafka_zl_etrack_event_stream
>> >where pageId = ''
>> >and eventId = 'click'
>> >and btnId = '
>> >and CHARACTER_LENGTH(user_id) > 4
>> >) A
>> >left join
>> >(
>> >select customerNumber, balance, fundCode, lastUpdateTime, proctime
>> >  from lscsp_sc_order_all
>> >   where `status` = '4'
>> > and businessType IN ('4','5','14','16','17','18')
>> > and fundCode IS NOT NULL
>> > and balance IS NOT NULL
>> > and lastUpdateTime IS NOT NULL
>> >) B
>> >on A.user_id = B.customerNumber and A.fund_code = B.fundCode
>> >group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, 
>> >cast(B.balance as double)
>> >
>> >
>> >
>> >
>> >
>> >
>> >Exception in thread "main" java.lang.AssertionError
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>> >at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>> >at 
>> >org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
>> >at 
>> >org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
>>
>>
>>
>>
>>


Re:Re: Re: flink 1.11 es未定义pk的sink问题

2020-07-13 文章 sunfulin
hi,YangZe,Leonard,
我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。



import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.StatementSet;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;




import static org.apache.flink.table.api.Expressions.$;




public class ESNewJobTest {




//构建StreamExecutionEnvironment

public static final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();




//构建EnvironmentSettings 并指定Blink Planner

private static final EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();




//构建StreamTableEnvironment

public static final StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, bsSettings);




//DDL语句

public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE 
es_sink_test_no_pk (\n" +

"  idx integer,\n" +

"  firstx varchar\n" +

") WITH (\n" +

"'connector' = 'elasticsearch-6',\n" +

"'hosts' = '168.61.113.171:9200',\n" +

"'index' = 'es_sink_test_no_pk',\n" +

"'document-type' = 'default',\n" +

"'document-id.key-delimiter' = '$',\n" +

"'sink.bulk-flush.interval' = '1000',\n" +

"'failure-handler' = 'fail',\n" +

"'format' = 'json'\n" +

")";

public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE 
es_sink_test_with_pk (\n" +

"  idx integer,\n" +

"  firstx varchar,\n" +

"  primary key (idx, firstx) not enforced\n" +

") WITH (\n" +

"'connector' = 'elasticsearch-6',\n" +

"'hosts' = '168.61.113.171:9200',\n" +

"'index' = 'es_sink_test_with_pk',\n" +

"'document-type' = 'default',\n" +

"'document-id.key-delimiter' = '$',\n" +

"'sink.bulk-flush.interval' = '1000',\n" +

"'failure-handler' = 'fail',\n" +

"'format' = 'json'\n" +

")";




public static String getCharAndNumr(int length) {

StringBuffer valSb = new StringBuffer();

for (int i = 0; i < length; i++) {

String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : 
"num"; // 输出字母还是数字

if ("char".equalsIgnoreCase(charOrNum)) {

// 字符串

int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  // 
取得大写字母还是小写字母

valSb.append((char) (choice + Math.round(Math.random()*25)));

} else if ("num".equalsIgnoreCase(charOrNum)) {

// 数字

valSb.append(String.valueOf(Math.round(Math.random()*9)));

}

}

return valSb.toString();




}




public static void main(String[] args) throws Exception {




DataStream ds = env.addSource(new 
RichParallelSourceFunction() {




volatile boolean flag = true;




@Override

public void run(SourceContext ctx) throws Exception {

while (flag) {

Row row = new Row(2);

row.setField(0, 2207);

row.setField(1, getCharAndNumr(4));

ctx.collect(row);

Thread.sleep(1000);

}




}




@Override

public void cancel() {

flag = false;

}

}).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));







//ES sink测试ddl

tEnv.executeSql(ES_SINK_DDL_NO_PK);

tEnv.executeSql(ES_SINK_DDL_WITH_PK);




//source注册成表

tEnv.createTemporaryView("test", ds, $("f0").as("idx"), 
$("f1").as("firstx"), $("p").proctime());




//sink写入

StatementSet ss = tEnv.createStatementSet();

ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx from 
test");

ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx 
from test");

ss.execute();

}

}





在 2020-07-13 14:03:21,"Yangze Guo"  写道:
>INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个Ind

Re:Re: flink 1.11 sql作业提交JM报错

2020-07-13 文章 sunfulin
hi,
感谢详细的解释和回复。那问题就清楚了。之前我们的job提交框架里统一都使用了StreamExecutionEnvironment.execute(jobName)方法,现在基于这个解释就明白了。

















在 2020-07-12 22:55:34,"godfrey he"  写道:
>hi sunfulin,
>
>1.11 对 StreamTableEnvironment.execute()
>和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
>简单概述为:
>1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
>2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
>3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
>(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
>或 StreamExecutionEnvironment.execute()
>
>详细可以参考 [1] [2]
>
>
>
>对于 “No operators defined in streaming topology.”,如果使用
>TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用
>StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
>提交作业,就会出现前面的错误。
>
>对于
>“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?
>
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
>
>Best,
>Godfrey
>
>Leonard Xu  于2020年7月12日周日 下午1:48写道:
>
>> HI, fulin
>>
>> 能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
>>  TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是
>> DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink
>> job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …)
>> 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。
>> Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法,
>> 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert
>> tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”),
>> TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job,
>> 这应该不是用户需要的。
>> 具体使用根据你的需要来使用。
>>
>>
>> Best,
>> Leonard Xu
>>
>>
>> 在 2020年7月11日,22:24,sunfulin  写道:
>>
>> statementset.execute
>>
>>
>>


Re:Re: flink 1.11 es未定义pk的sink问题

2020-07-13 文章 sunfulin



hi, Leonard
我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
我将DDL更换为之前版本的with参数(声明使用update-mode = 
‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。


 @karma...@gmail.com  我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?



CREATE TABLE ES6_SENSORDATA_SERVER_API (
  event varchar,
  user_id varchar,
  distinct_id varchar,
  _date varchar,
  _event_time varchar,
  recv_time varchar,
  code varchar,
  _current_project varchar,
  api varchar,
  elapsed int ,
  `start` bigint,
  is_err int
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = '',
'index' = 'flink_sensordata_server_api',
'document-type' = 'default',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.interval' = '1000',
'failure-handler' = 'fail',
'format' = 'json'
)







INSERT INTO ES6_SENSORDATA_SERVER_API

SELECT event,

   user_id,

   distinct_id,

   ts2Date(`time`, '-MM-dd') as _date,

   ts2Date(`time`, '-MM-dd HH:mm:ss.SSS') as _event_time,

   ts2Date(recv_time, false, false) as recv_time,

   properties.code as code,

   properties.`project` as _current_project,

   properties.api as api,

   properties.elapsed as elapsed,

   properties.`start` as `start`,

   case when properties.code = '0' then 0 else 1 end as is_err

FROM KafkaEventTopic

where `type` in ('track') and event in ('serverApiReqEvt')


在 2020-07-13 13:44:29,"Leonard Xu"  写道:
>Hello, fulin
>
>这个问题能提供段可以复现的代码吗?
>
>祝好,
>Leonard Xu
>
>
>> 在 2020年7月13日,09:50,Yangze Guo  写道:
>> 
>> Hi,
>> 
>> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
>> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
>> 
>> [1] 
>> https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
>> [2] 
>> https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
>> 
>> Best,
>> Yangze Guo
>> 
>> On Sat, Jul 11, 2020 at 11:33 PM sunfulin  wrote:
>>> 
>>> hi,
>>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary 
>>> key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary
>>>  key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
>>> 不确定是我配置使用的方式不对,还是确实存在bug。。
>>> 
>>> 
>>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
>>> event varchar,
>>> user_id varchar,
>>> distinct_id varchar,
>>> _date varchar,
>>> _event_time varchar,
>>> recv_time varchar,
>>> _browser_version varchar,
>>> path_name varchar,
>>> _search varchar,
>>> event_type varchar,
>>> _current_project varchar,
>>> message varchar,
>>> stack varchar,
>>> component_stack varchar,
>>> _screen_width varchar,
>>> _screen_height varchar
>>> ) WITH (
>>> 'connector' = 'elasticsearch-6',
>>> 'hosts' = '',
>>> 'index' = 'flink_sensordata_target_event',
>>> 'document-type' = 'default',
>>> 'document-id.key-delimiter' = '$',
>>> 'sink.bulk-flush.interval' = '1000',
>>> 'failure-handler' = 'fail',
>>> 'format' = 'json'
>>> )
>>> 
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling


Re:Re: flink 1.11 local execution oom问题

2020-07-13 文章 sunfulin
hi,
感谢大神的回复。看了下是因为我执行的SQL作业,有多个sink,在执行TableEnvironment.executeSQL时,Flink提交了多个job,在本地执行时貌似就走到了这个异常。我将多sink修改为TableEnvironment.createStatementSet来提交,就没有这个问题了。谢谢回复。

















在 2020-07-13 13:51:55,"Xintong Song"  写道:
>Local execution 模式下,Flink 是无法实际控制 JVM 的 Xmx, Xms, MaxDirectMemorySize
>等参数的,这些参数取决于你的 IDE 设置。
>检查一下 idea 的 run configuration 是否有配置过 -XX:MaxDirectMemorySize。
>
>Thank you~
>
>Xintong Song
>
>
>
>On Sat, Jul 11, 2020 at 3:48 PM Congxian Qiu  wrote:
>
>> Hi
>>
>> 这个问题可以看下是否和 releasenote[1] 中 memory configuration
>> 相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看
>>
>> [1]
>>
>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements
>> Best,
>> Congxian
>>
>>
>> sunfulin  于2020年7月10日周五 下午11:32写道:
>>
>> > hi,
>> >
>> >
>> 我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何?
>> > 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。
>> >
>> >
>> > Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate
>> > enough memory segments for NetworkBufferPool (required (Mb): 64,
>> allocated
>> > (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct
>> > out-of-memory error has occurred. This can mean two things: either job(s)
>> > require(s) a larger size of JVM direct memory or there is a direct memory
>> > leak. The direct memory can be allocated by user code or some of its
>> > dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> > configuration option should be increased. Flink framework and its
>> > dependencies also consume the direct memory, mostly for network
>> > communication. The most of network memory is managed by Flink and should
>> > not result in out-of-memory error. In certain special cases, in
>> particular
>> > for jobs with high parallelism, the framework may require more direct
>> > memory which is not managed by Flink. In this case
>> > 'taskmanager.memory.framework.off-heap.size' configuration option should
>> be
>> > increased. If the error persists then there is probably a direct memory
>> > leak in user code or some of its dependencies which has to be
>> investigated
>> > and fixed. The task executor has to be shutdown...
>>


flink 1.11 es未定义pk的sink问题

2020-07-11 文章 sunfulin
hi,
根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary 
key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary 
key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
不确定是我配置使用的方式不对,还是确实存在bug。。


CREATE TABLE ES6_SENSORDATA_OUTPUT (
  event varchar,
  user_id varchar,
  distinct_id varchar,
  _date varchar,
  _event_time varchar,
  recv_time varchar,
  _browser_version varchar,
  path_name varchar,
  _search varchar,
  event_type varchar,
  _current_project varchar,
  message varchar,
  stack varchar,
  component_stack varchar,
  _screen_width varchar,
  _screen_height varchar
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = '',
'index' = 'flink_sensordata_target_event',
'document-type' = 'default',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.interval' = '1000',
'failure-handler' = 'fail',
'format' = 'json'
)




[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling

Re:flink 1.11 sql作业提交JM报错

2020-07-11 文章 sunfulin



hi,
在JM日志中还有如下异常:这个也比较诡异。求大神帮忙解答下。


java.lang.IllegalStateException: No operators defined in streaming topology. 
Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:67) 
~[763f9e05-39d4-4c70-bf9c-e3bea7ef0e0f_FatJob-1.0.jar:?]
at 
com.htsc.crm_realtime.fatjob.Jobs.sensordata.SensorDataETLTask.main(SensorDataETLTask.java:47)
 ~[763f9e05-39d4-4c70-bf9c-e3bea7ef0e0f_FatJob-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_201]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_201]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:99)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 [?:1.8.0_201]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_201]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_201]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_201]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_201]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_201]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]














在 2020-07-11 22:24:51,"sunfulin"  写道:
>hi,
>我使用flink 
>1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet
> add多个dml语句,并执行execute。
>如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?
>
>
>Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
>one execute() or executeAsync() call in a single environment.
>at 
>org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>at 
>org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>at 
>org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> ~[flink-table-blink_2.12-1.11.0.jar:1.11.0]
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> ~[flink-table_2.12-1.11.0.jar:1.11.0]
>... 24 more


flink 1.11 sql作业提交JM报错

2020-07-11 文章 sunfulin
hi,
我使用flink 
1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet
 add多个dml语句,并执行execute。
如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?


Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
one execute() or executeAsync() call in a single environment.
at 
org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
 ~[flink-table-blink_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
 ~[flink-table_2.12-1.11.0.jar:1.11.0]
... 24 more

flink 1.11 local execution oom问题

2020-07-10 文章 sunfulin
hi,
我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何?
感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。


Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate 
enough memory segments for NetworkBufferPool (required (Mb): 64, allocated 
(Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct 
out-of-memory error has occurred. This can mean two things: either job(s) 
require(s) a larger size of JVM direct memory or there is a direct memory leak. 
The direct memory can be allocated by user code or some of its dependencies. In 
this case 'taskmanager.memory.task.off-heap.size' configuration option should 
be increased. Flink framework and its dependencies also consume the direct 
memory, mostly for network communication. The most of network memory is managed 
by Flink and should not result in out-of-memory error. In certain special 
cases, in particular for jobs with high parallelism, the framework may require 
more direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...

Re:Re: flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin






hi,
感谢两位的回复。我先来本地复现下。如果有问题的话,会建个issue。











在 2020-07-10 11:43:33,"Congxian Qiu"  写道:
>Hi
>
>从官方文档的配置[1] 来看,对于 handle failure 来说,默认是 fail,也就是说 request 失败了会导致作业失败的,可以尝试在
>log 中看能否找到这个日志,或者显示的设置成 fail 看看。如果发现 handle failure 是 fail 的情况下不符合预期,可以想
>Leonard 说的那样建立一个 issue 来追踪这个问题
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#connector-options
>Best,
>Congxian
>
>
>Leonard Xu  于2020年7月10日周五 上午11:33写道:
>
>> Hi,
>> 我理解作业应该失败才对,你本地可以复现吗?可以复现的话可以在社区建个issue。
>>
>> Best,
>> Leonard Xu
>>
>> > 在 2020年7月10日,11:20,sunfulin  写道:
>> >
>> > ,但是作业确实没失败。
>>
>>


Re:Re: flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin



hi,Leonard
是的。es集群服务不可用。我能观察到写入es失败,但是作业确实没失败。等到es集群服务恢复后,作业也正常了,但是故障期间的数据有丢失。














在 2020-07-10 11:16:17,"Leonard Xu"  写道:
>Hello, fulin
>
>> es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。
>
>es 服务挂掉是指es 集群不可用吗?那这时应该是写入es应该失败,作业也会失败,你说的没有积压是指什么呢?
>
>Best
>Leonard Xu


Re:Re: flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin
hi
我使用社区默认的ES,主要配置如下:我使用flink 1.10.1,blink-planner。使用了ES6的sink。
我看了下文档,默认有个参数是 
connector.failure-handler,是fail。我也能在TM日志里看到连接es失败的报错,但是整个任务checkpoint并没有失败。数据丢了。


WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '',
'connector.hosts' = '',
'connector.index' = 'realtime_fund_product_all_sell',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)

















在 2020-07-10 11:00:53,"Congxian Qiu"  写道:
>Hi
>
>你 ES Sink 是自己写的,还是用的社区的呢?社区的使用了哪个版本,以及配置是啥样的呢
>
>Best,
>Congxian
>
>
>sunfulin  于2020年7月10日周五 上午10:51写道:
>
>> hi,
>>
>> 我们现在使用flink消费kafka数据写到es,今天发现在默认设置下,es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。


flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin
hi,
我们现在使用flink消费kafka数据写到es,今天发现在默认设置下,es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。

Re:flink 双流join报错,java.lang.AssertionError

2020-07-09 文章 sunfulin



hi,
我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。














在 2020-07-09 16:53:34,"sunfulin"  写道:
>hi,
>我使用flink 1.10.1 
>blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select
> 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?
>
>
>select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name 
>as product_name, cast(B.balance as double) as balance
>from (
>select toLong(behaviorTime, true) as recvTime, user_id,
>cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
>regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
>regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime 
>from kafka_zl_etrack_event_stream
>where pageId = ''
>and eventId = 'click'
>and btnId = '
>and CHARACTER_LENGTH(user_id) > 4
>) A
>left join
>(
>select customerNumber, balance, fundCode, lastUpdateTime, proctime
>  from lscsp_sc_order_all
>   where `status` = '4'
> and businessType IN ('4','5','14','16','17','18')
> and fundCode IS NOT NULL
> and balance IS NOT NULL
> and lastUpdateTime IS NOT NULL
>) B
>on A.user_id = B.customerNumber and A.fund_code = B.fundCode
>group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, 
>cast(B.balance as double)
>
>
>
>
>
>
>Exception in thread "main" java.lang.AssertionError
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>at 
>org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
>at 
>org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)


flink 双流join报错,java.lang.AssertionError

2020-07-09 文章 sunfulin
hi,
我使用flink 1.10.1 
blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select
 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?


select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name 
as product_name, cast(B.balance as double) as balance
from (
select toLong(behaviorTime, true) as recvTime, user_id,
cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime from 
kafka_zl_etrack_event_stream
where pageId = ''
and eventId = 'click'
and btnId = '
and CHARACTER_LENGTH(user_id) > 4
) A
left join
(
select customerNumber, balance, fundCode, lastUpdateTime, proctime
  from lscsp_sc_order_all
   where `status` = '4'
 and businessType IN ('4','5','14','16','17','18')
 and fundCode IS NOT NULL
 and balance IS NOT NULL
 and lastUpdateTime IS NOT NULL
) B
on A.user_id = B.customerNumber and A.fund_code = B.fundCode
group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, 
cast(B.balance as double)






Exception in thread "main" java.lang.AssertionError
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
at 
org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
at 
org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)

Re:Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin
hi, noake
感谢分享。我加了这个依赖后也OK了。周知下大家。

















在 2020-07-07 22:15:05,"noake"  写道:
>我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
>dependency
> groupIdorg.apache.flink/groupId
> artifactIdflink-clients_${scala.binary.version}/artifactId
> version${flink.version}/version
>/dependency
>
>
>原始邮件
>发件人:Congxian qiuqcx978132...@gmail.com
>收件人:user-zhuser...@flink.apache.org
>抄送:Jark wuimj...@gmail.com; Jun zhangzhangjunemail...@gmail.com
>发送时间:2020年7月7日(周二) 19:35
>主题:Re: Re: Re: Re: flink 1.11 作业执行异常
>
>
>Hi 从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的 
>resources 文件下是否有对应的 resource 文件 Best, Congxian sunfulin sunfulin0...@163.com 
>于2020年7月7日周二 下午6:29写道: hi,  我的pom文件本地执行时,scope的provided都是去掉的。  dependency  
>groupIdorg.apache.flink/groupId   
>artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId  
>version${flink.version}/version  /dependency
>确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>  这个异常在啥情况下会触发到。在 2020-07-07 18:10:58,"Jark Wu" 
>imj...@gmail.com 写道:  如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 
>provided 掉了? 去掉  provided  再试试看?Best,  JarkOn Tue, 7 Jul 2020 at 
>18:01, sunfulin sunfulin0...@163.com wrote: hi,   @Jun Zhang 我一直使用的就是blink 
>planner,这个jar包一直都有的。 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?   
>在 2020-07-07 15:40:17,"Jark Wu" imj...@gmail.com 
>写道:   Hi,  你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?  Best,   Jark  On Tue, 7 
>Jul 2020 at 15:31, Jun Zhang zhangjunemail...@gmail.com   wrote:   
>hi.sunfulin你有没有导入blink的planner呢,加入这个试试   dependency
>groupIdorg.apache.flink/groupId   
>artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
>version${flink.version}/version/dependency  sunfulin 
>sunfulin0...@163.com 于2020年7月7日周二 下午3:21写道:hi, jark
>我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>configuration里的DeployOptions.TARGET
>(execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。  
>//构建StreamExecutionEnvironmentpublic static final 
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();   
>//构建EnvironmentSettings 并指定Blink Plannerprivate static final 
>EnvironmentSettings bsSettings =   
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); 
>  //构建StreamTableEnvironmentpublic static final StreamTableEnvironment 
>tEnv =StreamTableEnvironment.create(env, bsSettings);   
>tEnv.executeSql(“ddl sql”);//source注册成表   
>tEnv.createTemporaryView("test", ds, $("f0").as("id"),$("f1").as("first"), 
>$("p").proctime());//join语句   Table table = 
>tEnv.sqlQuery("select b.* from test a left  joinmy_dim FOR SYSTEM_TIME AS 
>OF a.p AS b on a.first = b.userId");//输出   
>tEnv.toAppendStream(table,  Row.class).print("LookUpJoinJob");
>env.execute("LookUpJoinJob");在 2020-07-06 
>14:59:17,"Jark Wu" imj...@gmail.com 写道:能分享下复现的作业代码不?Best,Jark  
>  On Mon, 6 Jul 2020 at 11:00, sunfulin sunfulin0...@163.com  wrote:   
>  Hi, 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: 
>org.apache.flink.table.api.TableExecution: Failed to execute sql 
>caused by : java.lang.IlleagalStateException: No ExecutorFactory   foundto 
>execute the application. at   
>org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。


Re:Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin



hi,
我的pom文件本地执行时,scope的provided都是去掉的。

org.apache.flink
   flink-table-planner-blink_${scala.binary.version}
   ${flink.version}



确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 这个异常在啥情况下会触发到。














在 2020-07-07 18:10:58,"Jark Wu"  写道:
>如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉 provided
>再试试看?
>
>Best,
>Jark
>
>On Tue, 7 Jul 2020 at 18:01, sunfulin  wrote:
>
>> hi,
>>  @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
>>
>>  @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-07 15:40:17,"Jark Wu"  写道:
>> >Hi,
>> >
>> >你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
>> >
>> >Best,
>> >Jark
>> >
>> >On Tue, 7 Jul 2020 at 15:31, Jun Zhang 
>> wrote:
>> >
>> >> hi.sunfulin
>> >> 你有没有导入blink的planner呢,加入这个试试
>> >>
>> >> 
>> >> org.apache.flink
>> >>
>>  flink-table-planner-blink_${scala.binary.version}
>> >> ${flink.version}
>> >> 
>> >>
>> >>
>> >> sunfulin  于2020年7月7日周二 下午3:21写道:
>> >>
>> >>>
>> >>>
>> >>>
>> >>> hi, jark
>> >>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>> >>> configuration里的DeployOptions.TARGET
>> >>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>> >>>
>> >>>
>> >>> //构建StreamExecutionEnvironment
>> >>> public static final StreamExecutionEnvironment env =
>> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>>
>> >>> //构建EnvironmentSettings 并指定Blink Planner
>> >>> private static final EnvironmentSettings bsSettings =
>> >>>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >>>
>> >>> //构建StreamTableEnvironment
>> >>> public static final StreamTableEnvironment tEnv =
>> >>> StreamTableEnvironment.create(env, bsSettings);
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>tEnv.executeSql(“ddl sql”);
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //source注册成表
>> >>>
>> >>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>> >>> $("f1").as("first"), $("p").proctime());
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //join语句
>> >>>
>> >>> Table table = tEnv.sqlQuery("select b.* from test a left join
>> >>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //输出
>> >>>
>> >>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> env.execute("LookUpJoinJob");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>> >>> >能分享下复现的作业代码不?
>> >>> >
>> >>> >Best,
>> >>> >Jark
>> >>> >
>> >>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>> >>> >
>> >>> >> Hi,
>> >>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>> >>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>> >>> >>
>> >>> >>
>> >>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory
>> found
>> >>> to
>> >>> >> execute the application.
>> >>> >>   at
>> >>> >>
>> >>>
>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>> >>> >>
>> >>> >>
>> >>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>> >>>
>> >>
>>


Re:Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin
hi, 
 @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。

 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?


















在 2020-07-07 15:40:17,"Jark Wu"  写道:
>Hi,
>
>你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
>
>Best,
>Jark
>
>On Tue, 7 Jul 2020 at 15:31, Jun Zhang  wrote:
>
>> hi.sunfulin
>> 你有没有导入blink的planner呢,加入这个试试
>>
>> 
>> org.apache.flink
>> 
>> flink-table-planner-blink_${scala.binary.version}
>> ${flink.version}
>> 
>>
>>
>> sunfulin  于2020年7月7日周二 下午3:21写道:
>>
>>>
>>>
>>>
>>> hi, jark
>>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>>> configuration里的DeployOptions.TARGET
>>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>>>
>>>
>>> //构建StreamExecutionEnvironment
>>> public static final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> //构建EnvironmentSettings 并指定Blink Planner
>>> private static final EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>
>>> //构建StreamTableEnvironment
>>> public static final StreamTableEnvironment tEnv =
>>> StreamTableEnvironment.create(env, bsSettings);
>>>
>>>
>>>
>>>
>>>
>>>tEnv.executeSql(“ddl sql”);
>>>
>>>
>>>
>>>
>>> //source注册成表
>>>
>>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>>> $("f1").as("first"), $("p").proctime());
>>>
>>>
>>>
>>>
>>> //join语句
>>>
>>> Table table = tEnv.sqlQuery("select b.* from test a left join
>>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>>>
>>>
>>>
>>>
>>> //输出
>>>
>>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>>>
>>>
>>>
>>>
>>> env.execute("LookUpJoinJob");
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>>> >能分享下复现的作业代码不?
>>> >
>>> >Best,
>>> >Jark
>>> >
>>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>>> >
>>> >> Hi,
>>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>>> >>
>>> >>
>>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found
>>> to
>>> >> execute the application.
>>> >>   at
>>> >>
>>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>>> >>
>>> >>
>>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>>>
>>


Re:Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin



hi, jark
我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink 
configuration里的DeployOptions.TARGET (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。


//构建StreamExecutionEnvironment
public static final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

//构建EnvironmentSettings 并指定Blink Planner
private static final EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

//构建StreamTableEnvironment
public static final StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, bsSettings);





   tEnv.executeSql(“ddl sql”);




//source注册成表

tEnv.createTemporaryView("test", ds, $("f0").as("id"), 
$("f1").as("first"), $("p").proctime());




//join语句

Table table = tEnv.sqlQuery("select b.* from test a left join my_dim 
FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");




//输出

tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");




env.execute("LookUpJoinJob");








在 2020-07-06 14:59:17,"Jark Wu"  写道:
>能分享下复现的作业代码不?
>
>Best,
>Jark
>
>On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>
>> Hi,
>> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>> org.apache.flink.table.api.TableExecution: Failed to execute sql
>>
>>
>> caused by : java.lang.IlleagalStateException: No ExecutorFactory found to
>> execute the application.
>>   at
>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>>
>>
>> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。


flink 1.11 作业执行异常

2020-07-05 文章 sunfulin
Hi,
我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
org.apache.flink.table.api.TableExecution: Failed to execute sql


caused by : java.lang.IlleagalStateException: No ExecutorFactory found to 
execute the application.
  at 
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)


想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。

Re:Re:flink asynctablefunction调用异常

2020-07-03 文章 sunfulin



hi
抱歉忘记回复了。经过进一步调试发现,是因为定义的schema的column类型,与实际获取到的字段类型不一致导致。主要是在调试的过程中,ComplettedFuture.complete会吃掉这种类型不一致的异常,也不下发数据。看源码发现只会在timeout的时候才调用future.completeException。记录下。














在 2020-07-03 17:01:19,"forideal"  写道:
>Hi sunfulin:
> 
>  我这么实现是可以的。
>public void eval(CompletableFuture> result, String key) {
>executorService.submit(() -> {
>try {
>Row row = fetchdata(key);
>if (row != null) {
>result.complete(Collections.singletonList(row));
>} else {
>result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
>}
>} catch (Exception e) {
>result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
>}
>});
>}
>
>
>
>
>Best forideal.
>
>
>
>
>
>在 2020-07-02 15:56:46,"sunfulin"  写道:
>>hi,
>>我在使用flink 1.10.1 blink 
>>planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
>>遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : 
>>java.lang.Exception: Could not complete the stream element: 
>>org.apache.flink.table.dataformat.BinaryRow  caused by : 
>>java.util.concurrent.TimeoutException: Async function call has timed out.
>>
>>
>>我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。


flink asynctablefunction调用异常

2020-07-02 文章 sunfulin
hi,
我在使用flink 1.10.1 blink 
planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : 
java.lang.Exception: Could not complete the stream element: 
org.apache.flink.table.dataformat.BinaryRow  caused by : 
java.util.concurrent.TimeoutException: Async function call has timed out.


我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。

Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin






hi, Leonard
这个写法应该是OK,不过我的场景下是下面这种
select a, b, row(commentId, commentContent) from T
group by a, b, commentId, commentContent
这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈?











在 2020-06-29 10:19:31,"Leonard Xu"  写道:
>Hi,
>异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 
>ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 
>UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 
>字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys 
>,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。
>
>你可以试下下面的query,query keys  对应es中的 id  就是 
>commentId${keyDelimiter}commentContent, 这也应该是你需要的结果 
>Select ROW(commentId, commentContent) from T
>group by commentId, commentContent
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月28日,22:33,sunfulin  写道:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> hi, 
>> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner
>> 
>> 
>> org.apache.flink.table.api.ValidationException: Only simple types that can 
>> be safely converted into a string representation can be used as keys. But 
>> was: Row(commentId: String, commentContent: String)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> at 
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
>> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-28 10:15:34,"Benchao Li"  写道:
>>> Hi,
>>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>>> 能附上异常栈就更好啦。
>>> 
>>> sunfulin  于2020年6月25日周四 下午4:35写道:
>>> 
>>>> Hi,
>>>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>>>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> Best,
>>> Benchao Li


Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin






hi, 
谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner


org.apache.flink.table.api.ValidationException: Only simple types that can be 
safely converted into a string representation can be used as keys. But was: 
Row(commentId: String, commentContent: String)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)















在 2020-06-28 10:15:34,"Benchao Li"  写道:
>Hi,
>我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>能附上异常栈就更好啦。
>
>sunfulin  于2020年6月25日周四 下午4:35写道:
>
>> Hi,
>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>
>
>
>-- 
>
>Best,
>Benchao Li


flink sql row类型group by

2020-06-25 文章 sunfulin
Hi,
请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?

Re:Re: Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 文章 sunfulin



谢谢Jark老大的回复。看起来在属性里增加   'timestamp.field' = 'timestamp'  
应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。














在 2020-06-05 19:31:37,"Jark Wu"  写道:
>访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
>目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。
>
>Best,
>Jark
>
>[1]:
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>
>On Fri, 5 Jun 2020 at 19:19, sunfulin  wrote:
>
>> Hi,
>> 想问下Flink SQL在使用DDL创建Kafka
>> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
>> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
>>
>>
>> CREATE TABLE user_behavior (
>> test_time TIMESTAMP(3),
>> user_id STRING ,
>> item_id STRING ,
>> category_id STRING ,
>> behavior STRING,
>> ts STRING,
>> proctime as PROCTIME() -- 通过计算列产生一个处理时间列
>> ) WITH (
>> 'connector.type' = 'kafka', -- 使用 kafka connector
>> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
>> 'connector.topic' = 'test', -- kafka topic
>> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
>> --'connector.properties.group.id' = 'mytest',
>> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', --
>> zookeeper 地址
>> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', --
>> kafka broker 地址
>> 'format.type' = 'json' -- 数据源格式为 json
>> ,'schema.0.rowtime.timestamps.type' = 'from-source',
>> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
>> 'schema.0.rowtime.watermarks.delay' = '5000'
>> )
>>
>>
>>
>>
>> 异常为:
>>
>>
>>  java.lang.UnsupportedOperationException: empty.max
>>  at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
>>  at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
>>  at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
>>  at scala.Option.map(Option.scala:146)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>  at org.apache.flink.table.planner.plan.no


Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 文章 sunfulin
Hi,
想问下Flink SQL在使用DDL创建Kafka 
Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:


CREATE TABLE user_behavior (
test_time TIMESTAMP(3),
user_id STRING ,
item_id STRING ,
category_id STRING ,
behavior STRING,
ts STRING,
proctime as PROCTIME() -- 通过计算列产生一个处理时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'test', -- kafka topic
'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
--'connector.properties.group.id' = 'mytest',
'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- zookeeper 
地址
'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- kafka 
broker 地址
'format.type' = 'json' -- 数据源格式为 json
,'schema.0.rowtime.timestamps.type' = 'from-source',
'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
'schema.0.rowtime.watermarks.delay' = '5000'
)




异常为:


 java.lang.UnsupportedOperationException: empty.max
 at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
 at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
 at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
 at scala.Option.map(Option.scala:146)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
 at org.apache.flink.table.planner.plan.no

Re:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-06 文章 sunfulin



Hi,
props.put("auto.offset.reset", "latest");
是加了这个设置导致的吧














在 2020-04-07 11:27:53,"苟刚"  写道:
>Hello,
>
> 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>
>
>flink版本:1.6.3
>
>部分代码如下:
>
>public static void main(String[] args) throws Exception {
>final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
>StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
>
>DataStreamSource data = KafkaTools.buildSource(env);
>// 处理timing数据
>processTimingData(parameterTool, data);
>// 处理front error数据
>processFrontErrorData(parameterTool, data);
>// 处理img error数据
>processImgLoadErrorData(parameterTool, data);
>env.execute("xlog compute");
>}
>
>
>
>
>kafka的连接参数配置:
>public static Properties buildKafkaProps(ParameterTool parameterTool) {
>Properties props = parameterTool.getProperties();
>props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, 
> DEFAULT_KAFKA_BROKERS));
>props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, 
> DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, 
> DEFAULT_KAFKA_GROUP_ID));
>props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>props.put("auto.offset.reset", "latest");
>return props;
>}
>
>
>
>
>
>
>
>--
>
>Best Wishes
>   Galen.K
>


Re:回复: Flink实时写入hive异常

2020-04-02 文章 sunfulin






hi,
请教下,现有功能里,可以将hive表作为维表join么?作为temporal table。
如果可以的话,hive分区表如何join呢?一般维表join是要join最新分区的全量数据。











在 2020-04-02 17:30:39,"111"  写道:
>Hi,
>只要能解决upsert问题带来的存储碎片、读写冲突、版本回溯,实时写入Hive也是可以的,目前spark delta lake就已经做到了。
>前面jingsong也提到过,会去解决文件存储、合并等问题,那到时候flink实时写入hive就没问题了。
>Best,
>Xinghalo


Re:回复: Flink实时写入hive异常

2020-04-02 文章 sunfulin



Hi,
这里就涉及到一个话题,应该怎么去实践实时和离线数仓的数据融合。我能理解这个技术上的不合理,但是通过Flink实现数仓ETL的功能,我相信很多使用Flink的会将之作为一个重要场景。














在 2020-04-01 16:05:54,"111"  写道:
>
>
>Hi,
>流写入hive,其实是属于数据湖的概念范畴。
>因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
>详细的可以了解 Delta lake 或 hudi。
>
>
>在2020年04月1日 15:05,sunfulin 写道:
>Hi,
>场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
>我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
>OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-01 15:01:32,"Jingsong Li"  写道:
>
>Hi,
>
>
>Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>
>
>你可以描述下详细堆栈、应用场景、SQL吗?
>
>
>Best,
>Jingsong Lee
>
>
>On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:
>
>
>
>
>
>我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>
>
>
>org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not 
>enough rules to produce a node with desired properties
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
>Hi,
>我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>cc  @Jingsong Li  @Jark Wu
>
>
>
>
>org.apache.flink.table.api.TableException: Stream Tables can only be
>emitted by AppendStreamTableSink, RetractStreamTableSink, or
>UpsertStreamTableSink.
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
>at
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
>at
>org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
>at
>scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>at
>org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
>at
>org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
>at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
>at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
>at
>com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
>at
>com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>--
>Best, Jingsong Lee
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best, Jingsong Lee


Re:Re: Re: Flink实时写入hive异常

2020-04-01 文章 sunfulin
Hi,
场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
















在 2020-04-01 15:01:32,"Jingsong Li"  写道:

Hi,


Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。


你可以描述下详细堆栈、应用场景、SQL吗?


Best,
Jingsong Lee


On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:





我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough 
rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>>
>>
>>
>>
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>>
>>  at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>>
>>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>
>>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>
>>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>
>>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>-- 
>Best, Jingsong Lee





 





--

Best, Jingsong Lee

Re:Re: Flink实时写入hive异常

2020-04-01 文章 sunfulin



我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough 
rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>>
>>
>>
>>
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>>
>>  at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>>
>>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>
>>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>
>>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>
>>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>-- 
>Best, Jingsong Lee


Flink实时写入hive异常

2020-04-01 文章 sunfulin
Hi, 
我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into 
xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
cc  @Jingsong Li  @Jark Wu 




org.apache.flink.table.api.TableException: Stream Tables can only be emitted by 
AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)

 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

 at scala.collection.Iterator.foreach(Iterator.scala:937)

 at scala.collection.Iterator.foreach$(Iterator.scala:937)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

 at scala.collection.IterableLike.foreach(IterableLike.scala:70)

 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at scala.collection.TraversableLike.map(TraversableLike.scala:233)

 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

 at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)

 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

 at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)

 at 
com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j

Re:在Flink SQL的JDBC Connector中,Oracle的TIMESTAMP字段类型转换异常问题

2020-03-27 文章 sunfulin



Hi,
据我所知现在还不能直接支持Oracle的driver吧?你是怎么使用Flink SQL读写oracle的哈?














在 2020-03-27 17:21:21,"111"  写道:
>Hi,
>在使用Flink SQL读写Oracle JDBC表时,遇到了timestamp转换异常:
>Caused by: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast 
>to java.sql.Timestamp at 
>org.apache.flink.table.dataformat.DataFormatConverters$TimestampConverter.toInternalImpl(DataFormatConverters.java:860)
> at 
>org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:344)
> at 
>org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1377)
> at 
>org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1365)
>看报错原因是Oracle JDBC返回的是oracle.sql.Timestamp, 
>而Flink的runtime-blink里面需要的参数类型java.sql.Timestamp。网上看到了解决方案:https://stackoverflow.com/questions/13269564/java-lang-classcastexception-oracle-sql-timestamp-cannot-be-cast-to-java-sql-ti/22055190#22055190不过我们是yarn
> session模式启动,如果想要修改系统参数需要把集群的每个参数都修改一遍。请问官方是否遇到这个问题,如何更优雅的解决?
>Best,Xinghalo


Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-12 文章 sunfulin




这样来用:
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);







在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn"  写道:
>
>这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
>StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>StreamQueryConfig qConfig = tableEnv.queryConfig();
>
>
>
>wangl...@geekplus.com.cn 
>
> 
>Sender: jinhai wang
>Send Time: 2020-03-12 13:44
>Receiver: user-zh@flink.apache.org
>Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
>应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
> 
> 
>在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
> 
>
>两个从 kafka 创建的表:
>
>tableA: key  valueA
>tableB: key  valueB 
>
>用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
> tableA join tableB on tableA.key = tableB.key;
>这两个表的历史数据在 flink 中存在哪里?存多久呢?
>
>比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
> 
>谢谢,
>王磊
>
>
>
>wangl...@geekplus.com.cn 
>
> 


Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 文章 sunfulin




Hi, 
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can 
also run it successfully in my IDE, but cannot run it through cluster by a 
shading jar. So I think maybe the problem is related with maven jar classpath. 
But not sure about that. 


If you can submit the job by a shade jar through cluster , could you share the 
project pom settings and sample test code ?







At 2020-03-02 20:36:06, "Benchao Li"  wrote:
>Hi fulin,
>
>I cannot reproduce your exception on current master using your SQLs. I
>searched the error message, it seems that this issue[1] is similar with
>yours, but it seems that current compile util does not have this issue.
>
>BTW, do you using 1.10?
>
>[1] https://issues.apache.org/jira/browse/FLINK-7490
>
>sunfulin  于2020年3月2日周一 上午11:17写道:
>
>>
>>
>>
>> *create table **lscsp_sc_order_all *(
>>   amount *varchar  *,
>>   argType *varchar*,
>>   balance *varchar*,
>>   branchNo *varchar  *,
>>   businessType *varchar *,
>>   channelType *varchar *,
>>   counterOrderNo *varchar  *,
>>   counterRegisteredDate *varchar*,
>>   custAsset *varchar  *,
>>   customerNumber *varchar*,
>>   customerType *varchar*,
>>   discountId *varchar*,
>>   doubleRecordFlag *varchar*,
>>   doubleRecordType *varchar*,
>>   exceedFlag *varchar*,
>>   fundAccount *varchar*,
>>   fundCode *varchar*,
>>   fundCompany *varchar*,
>>   fundName *varchar*,
>>   fundRecruitmentFlag *varchar*,
>>   id *varchar*,
>>   lastUpdateTime *varchar*,
>>   opBranchNo *varchar*,
>>   opStation *varchar*,
>>   orderNo *varchar*,
>>   orgEntrustNo *varchar*,
>>   orgOrderNo *varchar*,
>>   prodId *varchar*,
>>   prodInvestorType *varchar*,
>>   prodLeafType *varchar*,
>>   prodRisk *varchar*,
>>   prodRiskFlag *varchar*,
>>   prodRootType *varchar*,
>>   prodTerm *varchar*,
>>   prodVariety *varchar*,
>>   quaInvestorFlag *varchar*,
>>   quaInvestorSource *varchar*,
>>   quickPurchaseFlag *varchar*,
>>   remark *varchar*,
>>   remark1 *varchar*,
>>   remark2 *varchar*,
>>   remark3 *varchar*,
>>   riskFlag *varchar*,
>>   scRcvTime *varchar*,
>>   scSendTime *varchar*,
>>   signId *varchar*,
>>   signSpecialRiskFlag *varchar*,
>>   source *varchar*,
>>   *status** varchar*,
>>   subRiskFlag *varchar*,
>>   sysNodeId *varchar*,
>>   taSerialNo *varchar*,
>>   termFlag *varchar*,
>>   token *varchar*,
>>   tradeConfirmDate *varchar*,
>>   transFundCode *varchar*,
>>   transProdId *varchar*,
>>   varietyFlag *varchar*,
>>   zlcftProdType *varchar*,
>>   proctime *as *PROCTIME()
>> *-- 通过计算列产生一个处理时间列*)
>>
>> *with*(
>>   *'connector.type' *= *'kafka'*,
>> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
>> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>>
>> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
>> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
>> *''*,
>> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
>> *''*,
>> *-- kafka broker 地址  **'connector.properties.group.id
>> <http://connector.properties.group.id>' *=
>> *'acrm-realtime-saleorder-consumer-1'*,
>>   *'format.type' *= *'json'  *
>> *-- 数据源格式为 json*)
>>
>>
>> *CREATE TABLE **dim_app_cust_info *(
>> cust_id *varchar *,
>> open_comp_name *varchar *,
>> open_comp_id *varchar *,
>> org_name *varchar *,
>> org_id *varchar*,
>> comp_name *varchar *,
>> comp_id *varchar *,
>> mng_name *varchar *,
>> mng_id *varchar *,
>> is_tg *varchar *,
>> cust_name *varchar *,
>> cust_type *varchar*,
>> avg_tot_aset_y365 *double *,
>> avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'jdbc'*,
>> *'connector.url' *= *''*,
>> *'connector.table' *= *'app_cust_serv_rel_info'*,
>> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
>> *'connector.username' *= *'admin'*,
>> *'connector.password' *= *'Windows7'*,
>> *'connector.lookup.cache.max-rows' *= *'8000'*,
>> *'connector.lookup.cache.ttl' *= *'30min'*,
>> *'connector.lookup.max-retries' *=
>> *'3'*)
>>
>>
>>
>> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>> >Could you also provide us the DDL for lscsp_sc_order_all
>> >and dim_app_cust_info ?
>> >
>> >sunfulin  于2020年3月1日

Re:Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 sunfulin







create table lscsp_sc_order_all (
  amount varchar  ,
  argType varchar,
  balance varchar,
  branchNo varchar  ,
  businessType varchar ,
  channelType varchar ,
  counterOrderNo varchar  ,
  counterRegisteredDate varchar,
  custAsset varchar  ,
  customerNumber varchar,
  customerType varchar,
  discountId varchar,
  doubleRecordFlag varchar,
  doubleRecordType varchar,
  exceedFlag varchar,
  fundAccount varchar,
  fundCode varchar,
  fundCompany varchar,
  fundName varchar,
  fundRecruitmentFlag varchar,
  id varchar,
  lastUpdateTime varchar,
  opBranchNo varchar,
  opStation varchar,
  orderNo varchar,
  orgEntrustNo varchar,
  orgOrderNo varchar,
  prodId varchar,
  prodInvestorType varchar,
  prodLeafType varchar,
  prodRisk varchar,
  prodRiskFlag varchar,
  prodRootType varchar,
  prodTerm varchar,
  prodVariety varchar,
  quaInvestorFlag varchar,
  quaInvestorSource varchar,
  quickPurchaseFlag varchar,
  remark varchar,
  remark1 varchar,
  remark2 varchar,
  remark3 varchar,
  riskFlag varchar,
  scRcvTime varchar,
  scSendTime varchar,
  signId varchar,
  signSpecialRiskFlag varchar,
  source varchar,
  status varchar,
  subRiskFlag varchar,
  sysNodeId varchar,
  taSerialNo varchar,
  termFlag varchar,
  token varchar,
  tradeConfirmDate varchar,
  transFundCode varchar,
  transProdId varchar,
  varietyFlag varchar,
  zlcftProdType varchar,
  proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
)
with
(
  'connector.type' = 'kafka',  -- 使用 kafka connector
  'connector.version' = '0.10',  -- kafka 版本,universal 支持 0.11 以上的版本
  'connector.topic' = '',  -- kafka topic
  'connector.startup-mode' = 'group-offsets',  -- 从起始 offset 开始读取
  'connector.properties.zookeeper.connect' = '',  -- zookeeper 地址
  'connector.properties.bootstrap.servers' = '',  -- kafka 
broker 地址
  'connector.properties.group.id' = 'acrm-realtime-saleorder-consumer-1',
  'format.type' = 'json'  -- 数据源格式为 json
)







CREATE TABLE dim_app_cust_info (
cust_id varchar ,
open_comp_name varchar ,
open_comp_id varchar ,
org_name varchar ,
org_id varchar,
comp_name varchar ,
comp_id varchar ,
mng_name varchar ,
mng_id varchar ,
is_tg varchar ,
cust_name varchar ,
cust_type varchar,
avg_tot_aset_y365 double ,
avg_aset_create_y double
) WITH (
'connector.type' = 'jdbc',
'connector.url' = '',
'connector.table' = 'app_cust_serv_rel_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'admin',
'connector.password' = 'Windows7',
'connector.lookup.cache.max-rows' = '8000',
'connector.lookup.cache.ttl' = '30min',
'connector.lookup.max-retries' = '3'
)








At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>Could you also provide us the DDL for lscsp_sc_order_all
>and dim_app_cust_info ?
>
>sunfulin  于2020年3月1日周日 下午9:22写道:
>
>>
>> *CREATE TABLE **realtime_product_sell *(
>>   sor_pty_id *varchar*,
>>   entrust_date *varchar*,
>>   entrust_time *varchar*,
>>   product_code *varchar *,
>>   business_type *varchar *,
>>   balance *double *,
>>   cust_name *varchar *,
>>   open_comp_name *varchar *,
>>   open_comp_id *varchar *,
>>   org_name *varchar *,
>>   org_id *varchar *,
>>   comp_name *varchar *,
>>   comp_id *varchar *,
>>   mng_name *varchar *,
>>   mng_id *varchar *,
>>   is_tg *varchar *,
>>   cust_type *varchar *,
>>   avg_tot_aset_y365 *double *,
>>   avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'elasticsearch'*,
>> *'connector.version' *= *''*,
>> *'connector.hosts' *= *''*,
>> *'connector.index' *= *'realtime_product_sell_007118'*,
>> *'connector.document-type' *= *'_doc'*,
>> *'update-mode' *= *'upsert'*,
>> *'connector.key-delimiter' *= *'$'*,
>> *'connector.key-null-literal' *= *'n/a'*,
>> *'connector.bulk-flush.interval' *= *'1000'*,
>> *'format.type' *=
>> *'json'*)
>>
>>
>>
>>
>>
>> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
>> >The UDF looks good. Could you also paste your DDL? Then we can produce your
>> >bug easily.
>> >
>> >sunfulin  于2020年3月1日周日 下午6:39写道:
>> >
>> >> Below is the code. The function trans origin field timeStr "2020-03-01
>> >> 12:01:00.234" to target timeStr accroding to dayTag.
>> >>
>> >> *public class *ts2Date *extends *ScalarFunction {
>> >> *public *ts2Date() {
>> >>
>> >> }
>> >>
>> >>
>> >> *public *String eval (String timeStr, *boolean *dayTag) {
>> >>
>> >> *if*(timeStr == *null*) {
>> >> *return null*;
>> >> }
>> >> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"

Re:Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 sunfulin



CREATE TABLE realtime_product_sell (
  sor_pty_id varchar,
  entrust_date varchar,
  entrust_time varchar,
  product_code varchar ,
  business_type varchar ,
  balance double ,
  cust_name varchar ,
  open_comp_name varchar ,
  open_comp_id varchar ,
  org_name varchar ,
  org_id varchar ,
  comp_name varchar ,
  comp_id varchar ,
  mng_name varchar ,
  mng_id varchar ,
  is_tg varchar ,
  cust_type varchar ,
  avg_tot_aset_y365 double ,
  avg_aset_create_y double
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '',
'connector.hosts' = '',
'connector.index' = 'realtime_product_sell_007118',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)











At 2020-03-01 21:08:08, "Benchao Li"  wrote:
>The UDF looks good. Could you also paste your DDL? Then we can produce your
>bug easily.
>
>sunfulin  于2020年3月1日周日 下午6:39写道:
>
>> Below is the code. The function trans origin field timeStr "2020-03-01
>> 12:01:00.234" to target timeStr accroding to dayTag.
>>
>> *public class *ts2Date *extends *ScalarFunction {
>> *public *ts2Date() {
>>
>> }
>>
>>
>> *public *String eval (String timeStr, *boolean *dayTag) {
>>
>> *if*(timeStr == *null*) {
>> *return null*;
>> }
>> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
>> HH:mm:ss.SSS"*);
>> Date date = *new *Date();
>> *try *{
>> date = ortSf.parse(timeStr);
>> } *catch *(ParseException e) {
>> e.printStackTrace();
>> *return null*;
>> }
>> *if *(dayTag) {
>> String format = *"-MM-dd"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> } *else *{
>> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> }
>> }
>> }
>>
>>
>>
>> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
>>
>> Could you show how your UDF `ts2Date` is implemented?
>>
>> sunfulin  于2020年3月1日周日 下午6:05写道:
>>
>>> Hi, Benchao,
>>> Thanks for the reply.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> I am using Blink Planner. Not test with legacy planner because my program
>>> depend a lot of new feature based on blink planner.
>>> 2. how do you register your UDF?
>>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>>> ts2Date());tableEnv is a StreamTableEnvironment.
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>> I don't think this is related with checkpoint. If I enable checkpointing
>>> and not use my udf, I did not see any exception and submit job
>>> successfully. If I disable checkpointing and use udf, the job can submit
>>> successfully too.
>>>
>>> I dive a lot with this exception. Maybe it is related with some
>>> classloader issue. Hope for your suggestion.
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>>>
>>> Hi fulin,
>>>
>>> It seems like a bug in the code generation.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> 2. how do you register your UDF?
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>>
>>> sunfulin  于2020年3月1日周日 下午5:41写道:
>>>
>>>> Hi, guys
>>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>>>> failed to submit and throws exception like following. This is really weird,
>>>> cause when I remove the UDF, the job can submit successfully. Any
>>>> suggestion is highly appreciated. Besides, my sql logic is like :
>>>>
>>>> *INSERT INTO *realtime_product_sell
>>>> *select *U.sor_pty_id,
&

Re:Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 sunfulin
Below is the code. The function trans origin field timeStr "2020-03-01 
12:01:00.234" to target timeStr accroding to dayTag.



public class ts2Date extends ScalarFunction {
public ts2Date() {

}




public String eval (String timeStr, boolean dayTag) {

if(timeStr == null) {
return null;
}
SimpleDateFormat ortSf = new SimpleDateFormat("-MM-dd HH:mm:ss.SSS");
Date date = new Date();
try {
date = ortSf.parse(timeStr);
} catch (ParseException e) {
e.printStackTrace();
return null;
}
if (dayTag) {
String format = "-MM-dd";
SimpleDateFormat sf = new SimpleDateFormat(format);
return sf.format(date);
} else {
String format = "-MM-dd\'T\'HH:mm:00.000+0800";
SimpleDateFormat sf = new SimpleDateFormat(format);
return sf.format(date);
}


}
}







At 2020-03-01 18:14:30, "Benchao Li"  wrote:

Could you show how your UDF `ts2Date` is implemented?


sunfulin  于2020年3月1日周日 下午6:05写道:

Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.Inv

Re:Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 sunfulin
Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGra

Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 sunfulin
Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at 
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at 

Re:Flink 1.10连接hive时kerberos认证异常问题

2020-02-24 文章 sunfulin


Hi,
我在配置flink连接hive时,由于集群开启了Kerberos认证,经过一番探索,异常没有了。但是现在连接的时候需要我输入Kerberos用户名和密码。我理解指定了keytab文件路径后,应该不需要用户名和密码了吧?请教各位大神可能的配置问题。


security.kerberos.login.use-ticker-cache: false
security.kerberos.login.keytab: 
/app/flink/flink-1.10.10/kerberos/flink_test.keytab
security.kerberos.login.principal: flink_t...@hadoop.htsc.com









At 2020-02-21 18:18:57, "sunfulin"  wrote:

Hi,
我使用Flink 
1.10集成hive,在连接metastore的时候由于hive对应CDH集群开启了kerberos认证,抛出了如下异常:请问大家这个该怎么配置或者解决哈?


999  [main] INFO  hive.metastore  - Trying to connect to metastore with URI 
thrift://namenode01.htsc.com:9083
1175 [main] ERROR org.apache.thrift.transport.TSaslTransport  - SASL 
negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
  at 
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
  at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
  at 
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:420)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:236)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:181)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:118)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:43)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
  at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
  at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.doJob(HiveMetaJob.java:44)
  at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.main(HiveMetaJob.java:23)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed 
to find any Kerberos tgt)
  at 
sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
  at 
sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
  ... 34 more




 

Flink 1.10连接hive时kerberos认证异常问题

2020-02-21 文章 sunfulin
Hi,
我使用Flink 
1.10集成hive,在连接metastore的时候由于hive对应CDH集群开启了kerberos认证,抛出了如下异常:请问大家这个该怎么配置或者解决哈?


999  [main] INFO  hive.metastore  - Trying to connect to metastore with URI 
thrift://namenode01.htsc.com:9083
1175 [main] ERROR org.apache.thrift.transport.TSaslTransport  - SASL 
negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
  at 
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
  at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
  at 
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:420)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:236)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:181)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:118)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:43)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
  at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
  at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.doJob(HiveMetaJob.java:44)
  at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.main(HiveMetaJob.java:23)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed 
to find any Kerberos tgt)
  at 
sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
  at 
sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
  ... 34 more

Re:Re: 使用Flink 1.10 blink planner写ES的异常问题

2020-02-15 文章 sunfulin
好的,感谢。我在user里附加了query SQL。











在 2020-02-15 16:14:56,"Jark Wu"  写道:
>Hi sunfulin,
>
>这个异常是说通过 query 推断不出 query 的 primary key,不是说 sink 没有 primary key。至于为什么 query
>推断不出 pk,可能要结合 query 看一下。
>我看到你在 user@ 里面也发邮件了,我已经在那下面回复了,我们要不在 user@ 邮件下面继续讨论吧。可以将你们的 SQL 补充一下,包括
>DDL。
>
>Best,
>Jark
>
>On Fri, 14 Feb 2020 at 23:03, sunfulin  wrote:
>
>> Hi,
>> 我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into
>> table select xxx group by x),抛出了如下异常:
>> 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique
>> key目前不支持。。那这就是个悖论啊。。真的不科学。
>>
>>
>> 关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。
>>
>>
>> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
>> that Table has a full primary keys if it is updated.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82)
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80)
>> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27)
>>
>>


Re:Re: Flink DataTypes json parse exception

2020-02-11 文章 sunfulin
Hi, 
I am using the latest Flink 1.10 rc. When I run the same code using Flink 
1.8.2, there is no problem. But using 1.10 the issue just occur. 
Confused by the related reason.











At 2020-02-11 18:33:50, "Timo Walther"  wrote:
>Hi,
>
>from which Flink version are you upgrading? There were some changes in 
>1.9 for how to parse timestamps in JSON format.
>
>Your error might be related to those changes:
>
>https://issues.apache.org/jira/browse/FLINK-11727
>
>I hope this helps.
>
>Timo
>
>
>On 07.02.20 07:57, sunfulin wrote:
>> Hi, guys
>> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema 
>> defination.
>> I am reading and consuming records from kafka with json schema like   
>> {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is 
>> :
>> 
>> 
>> 
>> .withSchema(
>>  new Schema()
>>  // eventTime
>>  .field("rowtime", 
>> DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
>>  new Rowtime()
>>  .timestampsFromField("recv_time")
>>  .watermarksPeriodicBounded(1000)
>>  )
>>  .field("user_id", DataTypes.STRING())
>> 
>> 
>> 
>> 
>> 
>> 
>> But, I am running an issue and got exception like the following:
>> 
>> 
>> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' 
>> could not be parsed at index 0
>> at 
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> 


Re:Re: Flink connect hive with hadoop HA

2020-02-10 文章 sunfulin
Hi ,guys
Thanks for kind reply. Actually I want to know how to change client side haddop 
conf while using table API within my program. Hope some useful sug.











At 2020-02-11 02:42:31, "Bowen Li"  wrote:

Hi sunfulin,


Sounds like you didn't config the hadoop HA correctly on the client side 
according to [1]. Let us know if it helps resolve the issue.


[1] 
https://stackoverflow.com/questions/25062788/namenode-ha-unknownhostexception-nameservice1









On Mon, Feb 10, 2020 at 7:11 AM Khachatryan Roman  
wrote:

Hi,


Could you please provide a full stacktrace?


Regards,
Roman




On Mon, Feb 10, 2020 at 2:12 PM sunfulin  wrote:

Hi, guys
I am using Flink 1.10 and test functional cases with hive intergration. Hive 
with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can see 
successful connection with hive metastore, but cannot read table data with 
exception:


java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)


I am running a standalone application. Looks like I am missing my hadoop conf 
file in my flink job application classpath. Where should I config ?




 

Flink connect hive with hadoop HA

2020-02-10 文章 sunfulin
Hi, guys
I am using Flink 1.10 and test functional cases with hive intergration. Hive 
with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can see 
successful connection with hive metastore, but cannot read table data with 
exception:


java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)


I am running a standalone application. Looks like I am missing my hadoop conf 
file in my flink job application classpath. Where should I config ?

Flink DataTypes json parse exception

2020-02-06 文章 sunfulin
Hi, guys
When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema 
defination.
I am reading and consuming records from kafka with json schema like   
{"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :



.withSchema(
new Schema()
// eventTime
.field("rowtime", 
DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
new Rowtime()
.timestampsFromField("recv_time")
.watermarksPeriodicBounded(1000)
)
.field("user_id", DataTypes.STRING())






But, I am running an issue and got exception like the following:


Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could 
not be parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)



Re:回复:flink消费Kafka没有数据问题

2020-01-09 文章 sunfulin
感谢回复,排查后确实是hostname的配置问题。


任务还遇到了另外一个问题。下面是读取的Kafka连接配置,使用JSON SCHEMA来解析。不过实际运行时却抛出了如下异常,请问有大神知道是啥原因么?

Caused by: java.lang.NullPointerException: Null result cannot be used for 
atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.






tableEnv.connect(

new Kafka()

.version(kafkaInstance.getVersion())

.topic(chooseKafkaTopic(initPack.clusterMode))

.property("bootstrap.servers", kafkaInstance.getBrokerList())

.property("group.id", initPack.jobName)

.startFromEarliest()   // 测试用,上生产可以去掉

).withSchema(

new Schema()

// 时间戳字段

.field("rowtime", Types.SQL_TIMESTAMP).rowtime(

new Rowtime()

.timestampsFromField("time")

.watermarksPeriodicBounded(1000)

)

.field("type", Types.STRING)

.field("event", Types.STRING)

.field("user_id", Types.STRING)

.field("distinct_id", Types.STRING)

.field("project", Types.STRING)

.field("recv_time", Types.SQL_TIMESTAMP)

.field("properties", Types.ROW_NAMED(

new String[] { "BROWSER_VERSION", "pathname", "search", 
"eventType", "message", "stack", "componentStack" },

Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.STRING, Types.STRING)

)

).withFormat(

new Json().failOnMissingField(false)

.deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());












在 2020-01-10 09:53:52,"Evan"  写道:
>第一,查看神策的kafka的配置项advertised.host.name
>
>
>
>
>--原始邮件--
>发件人:"sunfulin"发送时间:2020年1月10日(星期五) 上午9:51
>收件人:"user-zh@flink.apache.org"
>主题:flink消费Kafka没有数据问题
>
>
>
>我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka
> console consumer能看到源源不断的数据。
>本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?


flink消费Kafka没有数据问题

2020-01-09 文章 sunfulin
我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka
 console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?

Re:Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 sunfulin
hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb 
with checkpoint off. So I really cannot see any state info from the dashboard. 
I will research more details and see if any alternative can be optimized. 











At 2020-01-08 19:07:08, "Benchao Li"  wrote:
>hi sunfulin,
>
>As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
>bound your job.
>You can check WindowOperator's latency metric to see how long it tasks to
>process an element.
>Hope this helps.
>
>sunfulin  于2020年1月8日周三 下午4:04写道:
>
>> Ah, I had checked resource usage and GC from flink dashboard. Seem that
>> the reason is not cpu or memory issue. Task heap memory usage is less then
>> 30%. Could you kindly tell that how I can see more metrics to help target
>> the bottleneck?
>> Really appreciated that.
>>
>>
>>
>>
>>
>> At 2020-01-08 15:59:17, "Kurt Young"  wrote:
>>
>> Hi,
>>
>> Could you try to find out what's the bottleneck of your current job? This
>> would leads to
>> different optimizations. Such as whether it's CPU bounded, or you have too
>> big local
>> state thus stuck by too many slow IOs.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:
>>
>>> hi sunfulin,
>>> you can try with blink planner (since 1.9 +), which optimizes distinct
>>> aggregation. you can also try to enable
>>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>>
>>> best,
>>> godfreyhe
>>>
>>> sunfulin  于2020年1月8日周三 下午3:39写道:
>>>
>>>> Hi, community,
>>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>>> index. I met a severe performance issue when running my flink job. Wanner
>>>> get some help from community.
>>>>
>>>>
>>>> Flink version : 1.8.2
>>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>>> After running the job, I can observe high backpressure from the flink
>>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>>
>>>>
>>>> running sql is like the following:
>>>>
>>>>
>>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>>
>>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>>> clkCnt  from
>>>>
>>>> (
>>>>
>>>> SELECT
>>>>
>>>>  aggId,
>>>>
>>>>  pageId,
>>>>
>>>>  statkey,
>>>>
>>>>  COUNT(DISTINCT deviceId) as cnt
>>>>
>>>>  FROM
>>>>
>>>>  (
>>>>
>>>>  SELECT
>>>>
>>>>  'ZL_005' as aggId,
>>>>
>>>>  'ZL_UV_PER_MINUTE' as pageId,
>>>>
>>>>  deviceId,
>>>>
>>>>  ts2Date(recvTime) as statkey
>>>>
>>>>  from
>>>>
>>>>  kafka_zl_etrack_event_stream
>>>>
>>>>  )
>>>>
>>>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>>
>>>> ) as t1
>>>>
>>>> group by aggId, pageId, statkey
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Best
>>>
>>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


flink算子状态查看

2020-01-08 文章 sunfulin
求问怎么通过dashboard查看状态存储量之类的统计?如果没开checkpoint的话

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 sunfulin
Ah, I had checked resource usage and GC from flink dashboard. Seem that the 
reason is not cpu or memory issue. Task heap memory usage is less then 30%. 
Could you kindly tell that how I can see more metrics to help target the 
bottleneck? 
Really appreciated that.








At 2020-01-08 15:59:17, "Kurt Young"  wrote:

Hi,


Could you try to find out what's the bottleneck of your current job? This would 
leads to 
different optimizations. Such as whether it's CPU bounded, or you have too big 
local 
state thus stuck by too many slow IOs. 


Best,

Kurt





On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 sunfulin


hi,godfreyhe
As far as I can see, I rewrite the running sql from one count distinct level to 
2 level agg, just as the table.optimizer.distinct-agg.split.enabled param 
worked. Correct me if I am telling the wrong way. But the rewrite sql does not 
work well for the performance throughout. 
For now I am not able to use blink planner on my apps because the current prod 
environment has not planned or ready to up to Flink 1.9+. 







At 2020-01-08 15:52:28, "贺小令"  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Flink SQL Count Distinct performance optimization

2020-01-07 文章 sunfulin
Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated. 


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best