回复: 如何快速定位拖慢速度的 operator

2020-06-29 文章 aven . wu
如果算子都在一个group里面的话确实在webui上不好看出背压问题,可以将operator chain 拆开。

• StreamExecutionEnvironment.disableOperatorChaining():关闭整个Job的OperatorChain
• 
someStream.filter(...).map(...).startNewChain().map():startNewChain()是指从当前Operator[map]开始一个新的chain,即:两个map会chaining在一起而filter不会(因为startNewChain的存在使得第一次map与filter断开了chain)。
• 
someStream.map(...).disableChaining():disableChaining()是指当前Operator[map]禁用OperatorChain,即:Operator[map]会独自占用一个Task。
• 
someStream.map(...).slotSharingGroup("name"):默认情况下所有Operator的slotGroup都为default,可以通过slotSharingGroup()进行自定义,Flink会将拥有相同slotGroup名称的Operators运行在相同Slot内,不同slotGroup名称的Operators运行在其他Slot内。
希望对你有帮助

Best
Aven

发件人: 徐骁
发送时间: 2020年6月28日 10:16
主题: Re: 如何快速定位拖慢速度的 operator

好的 感谢两位我试试

Sun.Zhu <17626017...@163.com> 于2020年6月25日周四 下午11:19写道:

> 虽然chain在一起,但是可以通过metrics中看出来各个算子的各项指标的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月25日 00:51,徐骁 写道:
> 两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊
>



如何做Flink Stream的性能测试

2020-06-15 文章 aven . wu
各位好;
   最近我想测试一下我的程序处理性能如何。请问有什么工具、或者应该通过什么方法来获得一个比较准确的测试结果。
我的场景包含从kafka读取,flink 处理(有查询es做维表关联),处理结果输出到ES 和 Kafka。
Best
Aven


回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

2020-05-13 文章 aven . wu
Hi 
根据你的情况,flink 写入ES 超时,一般是ES吞吐不足造成的,可以看一下官方的建议
https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
另外,es写入失败可以自定义一个 ActionRequestFailureHandler,你可以加入失败队列或者重试等等。
希望可以帮助到你。

Best
Aven

发件人: Yangze Guo
发送时间: 2020年5月13日 16:21
收件人: user-zh@flink.apache.org
主题: Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout 
[3]

您好,请问您的日志中有没有如语句

- Failed Elasticsearch bulk request:
- Failed Elasticsearch item request:
如果有,可以提供一下

从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1]

esSinkBuilder.setRestClientFactory(
  restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
  }
);

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink


Best,
Yangze Guo

On Wed, May 13, 2020 at 2:53 PM Jim Chen  wrote:
>
> 大家好,
>
> 我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
> [3],报错信息如下:
> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> 

Flink 严重背压问题排查

2020-05-11 文章 aven . wu
Hello 大家好
今天遇到个Flink 背压的问题,导致程序完全卡主不在处理数据,从监控页面看是应该是 keyProcess-> sink :alarm state  
处理数据有问题,导致上游 ruleProcess 出现背压。
KeyProcess 是中定义了一个MapState,每来一条数据会读取和更新state中的内容。Sink 是写入kafka已排除不是kafka的问题
http://qiniu.lgwen.cn/13F2DE58-98C1-4851-B54A-6BDC3C646169.png,
http://qiniu.lgwen.cn/image/jvm.png

Dump了 堆栈日志
http://qiniu.lgwen.cn/docstack.log
没什么排查的思路,如果方便的话,提供一些排查的思路。

Best
Aven



Flink on YARN 使用Kerboros认证失败

2020-03-24 文章 aven . wu
Flink 提交作业到有kerboros认证的集群报以下异常

java.lang.Exception: unable to establish the security context
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
at 
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
at 
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 5 more
Caused by: KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
... 11 more

使用了官网提供的四个参数,配置在了flink-conf.yaml里

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
security.kerberos.login.principal: flink/hado...@example.com
security.kerberos.login.realm: EXAMPLE.COM
security.kerberos.login.contexts: KafkaClient

/home/flink-1.8.0/conf/flink.keytab 文件已放好,


Best
Aven



回复: 关于Flink 命令行参数广播的问题

2020-03-11 文章 aven . wu
Hello
还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。
我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。

Best
Aven

发件人: zhisheng
发送时间: 2020年3月11日 21:16
收件人: user-zh
主题: Re: 关于Flink 命令行参数广播的问题

hi,aven.wu

可以使用 ParameterTool
获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool);

在算子中可以在 open 方法里面通过
getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置

aven.wu  于2020年3月11日周三 下午3:42写道:

> Hi,大家好!
> 遇到一个问题,在使用flink run
> 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。
>
> Best
> Aven
>
>



关于Flink 命令行参数广播的问题

2020-03-11 文章 aven . wu
Hi,大家好!
遇到一个问题,在使用flink run 
提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。

Best
Aven



回复: CliFrontend 未优先加载用户jar包中的class

2020-03-03 文章 aven . wu
感谢回答
后来我查了Flink run脚本的classpath设置,我修改了脚本将我的jar包指定在flink classpath的最前面得以解决问题

Best
Aven

发件人: tison
发送时间: 2020年3月3日 14:16
收件人: user-zh
主题: Re: CliFrontend 未优先加载用户jar包中的class

https://github.com/apache/flink/commit/0f30c263eebd2fc3ecbeae69a4ce9477e1d5d774

Best,
tison.


tison  于2020年3月3日周二 下午2:13写道:

> 1.9.2 和 1.10 上已经修复此问题,修改可参考
>
> https://issues.apache.org/jira/browse/FLINK-13749
>
> Best,
> tison.
>
>
> aven.wu  于2020年3月3日周二 下午2:04写道:
>
>> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
>> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
>> ,在Yarn集群上提交任务的时候出现了如下异常:
>> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
>> at
>> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
>> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
>> --main class jackson class load before
>> run--
>> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
>> 果然是从hadoop的classpath下加载了2.2.3版本
>>
>> 之后查看flink run命令入口程序
>> CliFrontend#bulidProgram line 799
>> PackagedProgram#PackagedProgram line 221
>> JobWithJars#BuildUserCodeClassLoad line 142
>> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
>> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user
>> jar包中加载类。
>> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
>>
>> Best
>> Aven
>>
>>



CliFrontend 未优先加载用户jar包中的class

2020-03-02 文章 aven . wu
组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client ,在Yarn集群上提交任务的时候出现了如下异常:
java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
at 
org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
--main class jackson class load before 
run--
file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
果然是从hadoop的classpath下加载了2.2.3版本

之后查看flink run命令入口程序
CliFrontend#bulidProgram line 799
PackagedProgram#PackagedProgram line 221
JobWithJars#BuildUserCodeClassLoad line 142 
return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user jar包中加载类。
请问如何修改此处的类加载顺序,优先从user jar 中加载class

Best
Aven



回复: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2020-01-03 文章 aven . wu
Hi Jingsong
感谢指点,使用DataStream 解决了我目前的问题。
对于RowTypeInfo的设置可能有些隐晦(指在创建Datastream时就需要指定)。
希望之后对tableenv.registerStream 
API能有更好更直接的方式来设置RowTypeInfo以及一些相关可能的信息。(包括注册Datastream, 
Datastream, Datastream)

Best,
Aven

发件人: JingsongLee
发送时间: 2019年12月31日 17:03
收件人: user-zh
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

Hi aven,

这是个合理的需求。
现在的问题是:
- Flink table只支持Row, Pojo, Tuple, CaseClass作为结构化的数据类型。
- 
而你的类型是JSONObject,它其实也是一个结构化的数据类型,但是目前Flink不支持它,所以可以考虑有这样的DeserializationSchema机制来支持它。

但是我理解其实没有差别多少,比如你提供RowDeserializationSchema,其实就是JSONObject到Row的转换,那你完全可以把这个套在DataStream.map中,把它转换成Flink
 table支持的结构化类型。

Best,
Jingsong Lee


--
From:aven.wu 
Send Time:2019年12月31日(星期二) 14:09
To:user-zh@flink.apache.org 
Subject:回复: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 





回复: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2019-12-30 文章 aven . wu
你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 




StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2019-12-27 文章 aven . wu
StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.


发送自 Windows 10 版邮件应用



[SQL] [TableAPI] Table.sqlQuery(sql) 和 tableSink 的 table schema 类型不匹配

2019-12-17 文章 aven . wu

Hi!
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Field types of query result and registered TableSink [aggregationTableSink] do 
not match.

SQL = SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as 
tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)
使用table.sqlQuery(SQL),返回的table schema 是 Query result schema: [cnt: Long, 
tumTime: Timestamp]。
而使用 
JsonRowSchemaConverter.convert("{" +
"type:'object'," +
"properties:{" +
"cnt: {" +
"type: 'number'" +
"}," +
"tumTime:{" +
"type:'string'," +
"format:'date-time'" +
"}" +
"}" +
“}");
创建Elasticsearch6UpsertTableSink table schema 是 TableSink schema:   [cnt: 
BigDecimal, tumTime: Timestamp]
而且我看了 JsonRowSchemaConverter.convert 所有的数字类型都被转成BigDecimal,导致SQL返回的schema 和 
json定义的schema无法匹配。

请问是我使用的问题还是说框架存在这个问题?

附上源代码:

public class AggregationFunction {



public static void main(String[] args) {
String sql = "SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL 
'10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' 
SECOND)";
StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
DataStream source = senv.addSource(new SourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception 
{
int i = 1000;
String[] names = {"Hanmeimei", "Lilei"};
while (i > 1) {
sourceContext.collect(new User(names[i%2], i, new 
Timestamp(System.currentTimeMillis(;
Thread.sleep(10);
i--;
}
}
@Override
public void cancel() {

}
});
tenv.registerDataStream("abc", source, "name, age, timestamp, 
rowtime.rowtime");
Table table = tenv.sqlQuery(sql);
List hosts = Arrays.asList(new Host("10.20.128.210", 19201, 
"http"));
TypeInformation typeInformation = 
JsonRowSchemaConverter.convert("{" +
"type:'object'," +
"properties:{" +
"cnt: {" +
"type: 'number'" +
"}," +
"tumTime:{" +
"type:'string'," +
"format:'date-time'" +
"}" +
"}" +
"}");
RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
TypeInformation[] typeInformations = typeInfo.getFieldTypes();

String[] fieldNames = typeInfo.getFieldNames();
TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < typeInformations.length; i ++) {
builder.field(fieldNames[i], typeInformations[i]);
}
Elasticsearch6UpsertTableSink establesink = new 
Elasticsearch6UpsertTableSink(
true,
builder.build(),
hosts,
"aggregation",
"data",
"$",
"n/a",
new JsonRowSerializationSchema.Builder(typeInformation).build(),
XContentType.JSON,
new IgnoringFailureHandler(),
new HashMap<>()
);
tenv.registerTableSink("aggregationTableSink", establesink);
table.insertInto("aggregationTableSink");
}


@Data
@AllArgsConstructor
@NoArgsConstructor
public static class User {
private String name;

private Integer age;

private Timestamp timestamp;
}


}



best wish!


flink on k8s 如何指定用户程序的入口

2019-12-08 文章 aven . wu
各位好!
关于flink on k8s
看了官网的文档之后Dockerfile,docker-entrypoint.sh,job-cluster-job.yaml.template等文件有以下问题:
1 standalone 
启动jobmanager之后是如何知道用户程序的主入口(要执行的main方法时哪个?)如果是通过Maven打包时候设置的,那么如何不在打包时不设置,而通过命令行传入
 类似 on yarn 模式下的 -c
2 如果是在 standalone-job.sh 时指定用户程序的主入口,那么如何传入用户自定义参数(在用户主程序args[]中接收)?

发送自 Windows 10 版邮件应用



Kafka库和Flink的反向类加载方法不兼容

2019-11-23 文章 aven . wu
报错如下
cannot assign instance of org.apache.commons.collections.map.LinkedMap to field 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
 of type org.apache.commons.collections.map.LinkedMap in instance of 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

修改flink-conf.yaml
classloader.resolve-order: parent-first

哪位大佬能解释一下这个反向类加载是什么意思?

发送自Aven.wu



答复: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章 aven . wu
你好:
可以自己构建 indexRequest 设置id,type,source 等字段
 ElasticsearchSinkFunction 不知道是否满足你的需求?


发件人: Jark Wu
发送时间: 2019年8月26日 18:00
主题: Re: 关于elasticSearch table sink 构造过于复杂

> ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.

据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。


如果使用的 blink planner,可以使用 deduplicate with keeping first row,是一个比较轻量的去重计算,能拿到一个 
key (也就是去重 key)。
文档还在 review 中,可以先看这个PR: 
https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953
 



Best,
Jark


> 在 2019年8月26日,17:20,hb <343122...@163.com> 写道:
> 
> 没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es,
> 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.
> 
> 
> 
> 在 2019-08-26 15:47:53,"Jark Wu"  写道:
>> 嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>>> 在 2019年8月26日,16:44,巫旭阳  写道:
>>> 
>>> 感谢解答,
>>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2019-08-26 16:39:49,"Jark Wu"  写道:
 Hi ,
 
 
 Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
 如果要注册一个 ES sink,可以使用 descriptor API,也就是 
 org.apache.flink.table.descriptors.Elasticsearch。
 或者使用 DDL 方式注册。
 
 
 Best,
 Jark
 
> 在 2019年8月26日,16:33,aven.wu  写道:
> 
> Elasticsearch6UpsertTableSink
> 的构造方法过于复杂参数非常多
> 
> public Elasticsearch6UpsertTableSink(
>boolean isAppendOnly,
>TableSchema schema,
>List hosts,
>String index,
>String docType,
>String keyDelimiter,
>String keyNullLiteral,
>SerializationSchema serializationSchema,
>XContentType contentType,
>ActionRequestFailureHandler failureHandler,
>Map sinkOptions) {
> 
> super(
>isAppendOnly,
>schema,
>hosts,
>index,
>docType,
>keyDelimiter,
>keyNullLiteral,
>serializationSchema,
>contentType,
>failureHandler,
>sinkOptions,
>UPDATE_REQUEST_FACTORY);
> }
> 
> 
> 请问,是不是我的用法不对?
> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>