Watermarks and parallelism

2020-05-13 Thread Gnanasoundari Soundarajan
Hi all,

I have below queries in flink. Could anyone help me to understand?

Query:

1 Is watermark maintained  globally at the operator level?

2 When we have a keyByOperator with parallelism >1, is there a single watermark 
maintained across all the parallel subtasks or for each of the parallel subtasks

3. Assuming I have a keybyoperator with parallelism > 1, is it possible to feed 
data to this operator from only one stream from the previous parameter (say map 
(1) always goes to window (1)

Regards,
Gnana


Flink Key based watermarks with event time

2020-05-13 Thread Gnanasoundari Soundarajan
Hi all,

We have a requirement where we need to maintain key based watermarks with event 
time. Each sensor will communicate with different timestamp where we need to 
maintain watermark separately for each sensor. Is this requirement can be 
achieved with Flink?

Thanks.

Regards,
Gnana


Re: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

2020-05-13 Thread zzh...@foxmail.com
非常感谢Benchao Li,使用UDF测试通过,SQL示例如下:

CREATE TABLE session_login (
    ,deal_time BIGINT
    ,deal_time_obj as DY_FROM_UNIXTIME(deal_time*1000)
    ,WATERMARK FOR deal_time_obj AS deal_time_obj - INTERVAL '60' SECOND
)WITH( 
  ..
)

其中DY_FROM_UNIXTIME负责将long转成timestamp类型

oliver zhang,云长
zzh...@foxmail.com
 
发件人: Benchao Li
发送时间: 2020-05-14 10:23
收件人: user-zh
主题: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间
你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。
 
zzh...@foxmail.com  于2020年5月14日周四 上午10:02写道:
 
> Hi,all
> kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
> 我的问题是:
> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
> 2、如果没有,后期是否有相应的规划
>
> 测试版本:Flink 1.10
>
>
> 在Flink 1.10中测试SQL如下:
> CREATE TABLE session_login (
> deal_time TIMESTAMP(3)
> ,aaVARCHAR
> ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>  )WITH(
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'topic',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.group.id' = 'group.id',
> 'connector.properties.zookeeper.connect' = 'ip:port',
> 'connector.properties.bootstrap.servers' = 'ip:port',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
>  );
>
> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.time.format.DateTimeParseException: Text '1589420545'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
> Exception in thread "main"
> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33
> to line 4, column 64: Cannot apply '-' to arguments of type ' -
> '. Supported form(s): ' - '
> ' - '
> ' - '
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
> at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
> at
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
> at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
> at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
> at
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
> at
> 

Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

2020-05-13 Thread Benchao Li
你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。

zzh...@foxmail.com  于2020年5月14日周四 上午10:02写道:

> Hi,all
> kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
> 我的问题是:
> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
> 2、如果没有,后期是否有相应的规划
>
> 测试版本:Flink 1.10
>
>
> 在Flink 1.10中测试SQL如下:
> CREATE TABLE session_login (
> deal_time TIMESTAMP(3)
> ,aaVARCHAR
> ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>  )WITH(
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'topic',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.group.id' = 'group.id',
> 'connector.properties.zookeeper.connect' = 'ip:port',
> 'connector.properties.bootstrap.servers' = 'ip:port',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
>  );
>
> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.time.format.DateTimeParseException: Text '1589420545'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
> Exception in thread "main"
> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33
> to line 4, column 64: Cannot apply '-' to arguments of type ' -
> '. Supported form(s): ' - '
> ' - '
> ' - '
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
> at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
> at
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
> at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
> at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
> at
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
> at
> org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
> at
> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599)
> at
> 

SQL DDL怎样使用Long类型的时间戳作为事件时间

2020-05-13 Thread zzh...@foxmail.com
Hi,all
    kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
我的问题是:
1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
2、如果没有,后期是否有相应的规划

测试版本:Flink 1.10


在Flink 1.10中测试SQL如下:
CREATE TABLE session_login (
    deal_time TIMESTAMP(3)
    ,aaVARCHAR
    ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
 )WITH(
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'topic',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.group.id' = 'group.id',
    'connector.properties.zookeeper.connect' = 'ip:port',
    'connector.properties.bootstrap.servers' = 'ip:port',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
 );

如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
Caused by: java.io.IOException: Failed to deserialize JSON object.
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.time.format.DateTimeParseException: Text '1589420545' could not 
be parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: 
From line 4, column 33 to line 4, column 64: Cannot apply '-' to arguments of 
type ' - '. Supported form(s): ' - '
' - '
' - '
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
at 
org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
at 
org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
at 
org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
at 
org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
at 
org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690)
at 

回复: 1.10 使用 flinkSQL 的row_number()函数实现top1 出现数组越界,求助社区大佬

2020-05-13 Thread 1101300123
好的,期待修复,项目快上线了,有点慌,自己也在看相关片段代码


在2020年5月14日 09:43,刘大龙 写道:
Hi,
云邪已经确认这是一个bug,开了issue来跟进这个问题
https://issues.apache.org/jira/browse/FLINK-17625

-原始邮件-
发件人: 1101300123 
发送时间: 2020-05-14 09:19:41 (星期四)
收件人: "user-zh@flink.apache.org" 
抄送:
主题: 1.10 使用 flinkSQL 的row_number()函数实现top1 出现数组越界,求助社区大佬

有没有人看到这个邮件?大概跑10小时左右数据就会出现;
困扰了好几天
我的SQL语句如下,部分字段省略
select
a.contact_id,
...
a.code_contact_channel
from
(
select
contact_id,
service_no,
...
code_contact_channel,
row_number() over(partition by contact_id,service_no order by operate_time 
desc) as rn
from
table1
)a
join (
select
contact_id ,
mobile_no,
...
row_number() over(partition by contact_id,mobile_no order by create_time desc) 
as rn
from
table2 )b
on
a.contact_id = b.contact_id
and a.service_no = b.mobile_no
where
a.rn = 1
and b.rn = 1 ;
程序部署在yarn上运行几个小时后就会出现错误日志显示如下:


[flink-akka.actor.default-dispatcher-8695] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[contact_id, service_no], 
orderBy=[operate_time DESC], select=[operate_time, contact_id.]) -> 
Calc(select=[contact_id, start_time, contact_length, 
service_no...code_contact_channel]) (1/1) (52b8519ad9a44832a283c1760f385bf6) 
switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.remove(ArrayList.java:499)
at 
org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElementWithoutRowNumber(AppendOnlyTopNFunction.java:205)
at 
org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:120)
at 
org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:46)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
[flink-akka.actor.default-dispatcher-8695] INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
 - Calculating tasks to restart to recover the failed task 
28aa070d07f48addbf378d6ee01a29c6_0.


topn函数flinkSQL已经支持很久了,我不清楚为什么会产生这种错误,希望各位老师给出意见



--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281


Re: 1.10 使用 flinkSQL 的row_number()函数实现top1 出现数组越界,求助社区大佬

2020-05-13 Thread 刘大龙
Hi,
云邪已经确认这是一个bug,开了issue来跟进这个问题
https://issues.apache.org/jira/browse/FLINK-17625

> -原始邮件-
> 发件人: 1101300123 
> 发送时间: 2020-05-14 09:19:41 (星期四)
> 收件人: "user-zh@flink.apache.org" 
> 抄送: 
> 主题: 1.10 使用 flinkSQL 的row_number()函数实现top1 出现数组越界,求助社区大佬
> 
> 有没有人看到这个邮件?大概跑10小时左右数据就会出现;
> 困扰了好几天
> 我的SQL语句如下,部分字段省略
> select
>a.contact_id,
>...
>a.code_contact_channel
>   from
>(
>select
> contact_id,
> service_no,
> ...
> code_contact_channel,
> row_number() over(partition by contact_id,service_no order by 
> operate_time desc) as rn
>from
> table1
>)a
>   join (
>select
> contact_id ,
> mobile_no,
> ...
> row_number() over(partition by contact_id,mobile_no order by create_time 
> desc) as rn
>from
> table2 )b
> on
> a.contact_id = b.contact_id
> and a.service_no = b.mobile_no
>   where
>a.rn = 1
>and b.rn = 1 ;
> 程序部署在yarn上运行几个小时后就会出现错误日志显示如下:
> 
> 
> [flink-akka.actor.default-dispatcher-8695] INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - 
> Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[contact_id, service_no], 
> orderBy=[operate_time DESC], select=[operate_time, contact_id.]) -> 
> Calc(select=[contact_id, start_time, contact_length, 
> service_no...code_contact_channel]) (1/1) (52b8519ad9a44832a283c1760f385bf6) 
> switched from RUNNING to FAILED.
> java.lang.ArrayIndexOutOfBoundsException: -1
>  at java.util.ArrayList.elementData(ArrayList.java:422)
>  at java.util.ArrayList.remove(ArrayList.java:499)
>  at 
> org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElementWithoutRowNumber(AppendOnlyTopNFunction.java:205)
>  at 
> org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:120)
>  at 
> org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:46)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:748)
> [flink-akka.actor.default-dispatcher-8695] INFO 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
>  - Calculating tasks to restart to recover the failed task 
> 28aa070d07f48addbf378d6ee01a29c6_0.
> 
> 
> topn函数flinkSQL已经支持很久了,我不清楚为什么会产生这种错误,希望各位老师给出意见
>


--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281


[CVE-2020-1960] Apache Flink JMX information disclosure vulnerability

2020-05-13 Thread Chesnay Schepler

CVE-2020-1960: Apache Flink JMX information disclosure vulnerability

Severity: Medium
(CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:U/C:H/I:L/A:H)

Vendor:
The Apache Software Foundation

Versions Affected:
Flink 1.1.0 to 1.1.5
Flink 1.2.0 to 1.2.1
Flink 1.3.0 to 1.3.3
Flink 1.4.0 to 1.4.2
Flink 1.5.0 to 1.5.6
Flink 1.6.0 to 1.6.4
Flink 1.7.0 to 1.7.2
Flink 1.8.0 to 1.8.3
Flink 1.9.0 to 1.9.2
Flink 1.10.0

Description:
When running a process with an enabled JMXReporter, with a port 
configured via metrics.reporter..port, an attacker with 
local access to the machine and JMX port can execute a man-in-the-middle 
attack using a specially crafted request to rebind the JMXRMI registry 
to one under the attacker's control. This compromises any connection 
established to the process via JMX, allowing extraction of credentials 
and any other transferred data.


Mitigation:
All users should either
- remove the port parameter from the reporter configuration,
- upgrade to 1.9.3 or 1.10.1,
- obtain the source for their release from the Flink website or git and 
apply the patch corresponding to their version from the list below:


1.1: a61b5d2b362d11e7b9deeb2334d275325574bd7b
1.2: d2a051267ffbeef5c1fd981860fb7032d9ac8a60
1.3: 4f06bb75cd726096af43587ca4fb182b2e4bae2e
1.4: 12787eceb49c566b28aa876fc2892d21a0ec3d79
1.5: f9b4e0dea71abbcd6463c757577c70c45b3e6bbf
1.6: b8647b1ca019003ae939b7494bba4e54de167b6f
1.7: 5e0b7970a9aea74aba4ebffaa75c37e960799b93
1.8: 0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f
1.9: 58b58f4b16a2e25c95b465377d43a51ad8ef3f6a
1.10: 804ae70024bf8be7c0c7093d02addb080c318662

Credit:
The issue was discovered in Flink by Jonathan Gallimore, Tomitribe and 
Colm O hEigeartaigh, Talend.
The underlying vulnerability pattern was discovered by An Trinh and 
published at Blackhat.




[CVE-2020-1960] Apache Flink JMX information disclosure vulnerability

2020-05-13 Thread Chesnay Schepler

CVE-2020-1960: Apache Flink JMX information disclosure vulnerability

Severity: Medium
(CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:U/C:H/I:L/A:H)

Vendor:
The Apache Software Foundation

Versions Affected:
Flink 1.1.0 to 1.1.5
Flink 1.2.0 to 1.2.1
Flink 1.3.0 to 1.3.3
Flink 1.4.0 to 1.4.2
Flink 1.5.0 to 1.5.6
Flink 1.6.0 to 1.6.4
Flink 1.7.0 to 1.7.2
Flink 1.8.0 to 1.8.3
Flink 1.9.0 to 1.9.2
Flink 1.10.0

Description:
When running a process with an enabled JMXReporter, with a port 
configured via metrics.reporter..port, an attacker with 
local access to the machine and JMX port can execute a man-in-the-middle 
attack using a specially crafted request to rebind the JMXRMI registry 
to one under the attacker's control. This compromises any connection 
established to the process via JMX, allowing extraction of credentials 
and any other transferred data.


Mitigation:
All users should either
- remove the port parameter from the reporter configuration,
- upgrade to 1.9.3 or 1.10.1,
- obtain the source for their release from the Flink website or git and 
apply the patch corresponding to their version from the list below:


1.1: a61b5d2b362d11e7b9deeb2334d275325574bd7b
1.2: d2a051267ffbeef5c1fd981860fb7032d9ac8a60
1.3: 4f06bb75cd726096af43587ca4fb182b2e4bae2e
1.4: 12787eceb49c566b28aa876fc2892d21a0ec3d79
1.5: f9b4e0dea71abbcd6463c757577c70c45b3e6bbf
1.6: b8647b1ca019003ae939b7494bba4e54de167b6f
1.7: 5e0b7970a9aea74aba4ebffaa75c37e960799b93
1.8: 0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f
1.9: 58b58f4b16a2e25c95b465377d43a51ad8ef3f6a
1.10: 804ae70024bf8be7c0c7093d02addb080c318662

Credit:
The issue was discovered in Flink by Jonathan Gallimore, Tomitribe and 
Colm O hEigeartaigh, Talend.
The underlying vulnerability pattern was discovered by An Trinh and 
published at Blackhat.




Re: changing the output files names in Streamfilesink from part-00 to something else

2020-05-13 Thread dhurandar S
Yes we looked at it ,
The problem is the file name gets generated in a dynamic fashion, based on
which organization data we are getting we generate the file name from the
coming data.

Is there any way we can achieve this ??

On Tue, May 12, 2020 at 8:38 PM Yun Gao  wrote:

> Hi Dhurandar:
>
> Currently StreamingFileSink should be able to change the prefix and
> suffix of the filename[1], it could be changed to something like
> -0-0. Could this solve your problem ?
>
>
>  Best,
>   Yun
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#part-file-configuration
>
>
>
> --
> 发件人:dhurandar S
> 日 期:2020年05月13日 05:13:04
> 收件人:user; 
> 主 题:changing the output files names in Streamfilesink from part-00 to
> something else
>
> We want to change the name of the file being generated as the output of
> our StreamFileSink.
> , when files are generated they are named part-00*, is there a way that we
> can change the name.
>
> In Hadoop, we can change RecordWriters and MultipleOutputs. May I please
> some help in this regard. This is causing blockers for us and will force us
> t move to MR job
>
> --
> Thank you and regards,
> Dhurandar
>
>
>

-- 
Thank you and regards,
Dhurandar


[ANNOUNCE] Apache Flink 1.10.1 released

2020-05-13 Thread Yu Li
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.1, which is the first bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/05/12/release-1.10.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Yu


[ANNOUNCE] Apache Flink 1.10.1 released

2020-05-13 Thread Yu Li
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.1, which is the first bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/05/12/release-1.10.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Yu


Re: Flink Memory analyze on AWS EMR

2020-05-13 Thread Jacky D
Hi, Xintong

Thanks for point it out, after I set up the log path it's working now .
so , for conclusion .

on emr , to set up jitwatch in flink-conf.yaml, we should not include
quotes and give a path to output the jit log file . this is different from
setting it on standalone cluster .
example :
env.java.opts: -XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading
-XX:+LogCompilation -XX:LogFile=/tmp/flinkmemdump.jit -XX:+PrintAssembly

Thanks everyone involved in this discussion!

Jacky

Xintong Song  于2020年5月12日周二 下午10:41写道:

> Hi Jacky,
>
> I don't think ${FLINK_LOG_PREFIX} is available for Flink Yarn deployment.
> This is just my guess, that the actual file name becomes ".jit". You can
> try to verify that by looking for the hidden file.
>
> If it is indeed this problem, you can try to replace "${FLINK_LOG_PREFIX}"
> with "/your-file-name.jit". The token "" should be
> replaced with proper log directory path by Yarn automatically.
>
> I noticed that the usage of ${FLINK_LOG_PREFIX} is recommended by Flink's
> documentation [1]. This is IMO a bit misleading. I'll try to file an issue
> to improve the docs.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/application_profiling.html#profiling-with-jitwatch
>
> On Wed, May 13, 2020 at 2:45 AM Jacky D  wrote:
>
>> hi, Arvid
>>
>> thanks for the advice  ,  I removed the quotes and it do created a yarn
>> session on EMR , but I didn't find any jit log file generated .
>>
>> The config with quotes is working on standalone cluster . I also tried to
>> dynamic pass the property within the yarn session command :
>>
>> flink-yarn-session -n 1 -d -nm testSession -yD 
>> env.java.opts="-XX:+UnlockDiagnosticVMOptions
>> -XX:+TraceClassLoading -XX:+LogCompilation
>> -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
>>
>>
>> but get same result , session created , but can not find any jit log file
>> under container log .
>>
>>
>> Thanks
>>
>> Jacky
>>
>> Arvid Heise  于2020年5月12日周二 下午12:57写道:
>>
>>> Hi Jacky,
>>>
>>> I suspect that the quotes are the actual issue. Could you try to remove
>>> them? See also [1].
>>>
>>> [1]
>>> http://blogs.perl.org/users/tinita/2018/03/strings-in-yaml---to-quote-or-not-to-quote.html
>>>
>>> On Tue, May 12, 2020 at 4:03 PM Jacky D  wrote:
>>>
 hi, Xintong

 Thanks for reply , I attached those lines below for application master
 start command :


 2020-05-11 21:16:16,635 DEBUG
 org.apache.hadoop.util.PerformanceAdvisory- Crypto
 codec org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec is not available.
 2020-05-11 21:16:16,635 DEBUG
 org.apache.hadoop.util.PerformanceAdvisory- Using
 crypto codec org.apache.hadoop.crypto.JceAesCtrCryptoCodec.
 2020-05-11 21:16:16,636 DEBUG org.apache.hadoop.hdfs.DataStreamer
  - DataStreamer block
 BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
 packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false
 lastByteOffsetInBlock: 1697
 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
  - DFSClient seqno: 0 reply: SUCCESS
 downstreamAckTimeNanos: 0 flag: 0
 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
  - DataStreamer block
 BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
 packet seqno: 1 offsetInBlock: 1697 lastPacketInBlock: true
 lastByteOffsetInBlock: 1697
 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
  - DFSClient seqno: 1 reply: SUCCESS
 downstreamAckTimeNanos: 0 flag: 0
 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
  - Closing old block
 BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315
 2020-05-11 21:16:16,641 DEBUG org.apache.hadoop.ipc.Client
 - IPC Client (1954985045) connection to
 ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #70
 org.apache.hadoop.hdfs.protocol.ClientProtocol.complete
 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
 - IPC Client (1954985045) connection to
 ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value
 #70
 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
  - Call: complete took 2ms
 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
 - IPC Client (1954985045) connection to
 ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #71
 org.apache.hadoop.hdfs.protocol.ClientProtocol.setTimes
 2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.Client
 - IPC Client (1954985045) connection to
 ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop 

Re: Flink Streaming Job Tuning help

2020-05-13 Thread Senthil Kumar
Zhijiang,

Thanks for your suggestions. We will keep it in mind!

Kumar

From: Zhijiang 
Reply-To: Zhijiang 
Date: Tuesday, May 12, 2020 at 10:10 PM
To: Senthil Kumar , "user@flink.apache.org" 

Subject: Re: Flink Streaming Job Tuning help

Hi Kumar,


I can give some general ideas for further analysis.

> We are finding that flink lags seriously behind when we introduce the keyBy 
> (presumably because of shuffle across the network)
The `keyBy` would break the chained operators, so it might bring obvious 
performance sensitive in practice. I guess if your previous way without keyBy 
can make use of chained mechanism,
the follow-up operator can consume the emitted records from the preceding 
operator directly, no need to involve in buffer serialization-> network shuffle 
-> buffer deserializer processes,
especially your record size 10K is a bit large.

If the keyBy is necessary in your case, then you can further check the current 
bottleneck. E.g. whether there are back pressure which you can monitor from web 
UI. If so, which task is the
bottleneck to cause the back pressure, and you can trace it by network related 
metrics.

Whether there are data skew in your case, that means some task would process 
more records than others. If so, maybe we can increase the parallelism to 
balance the load.

Best,
Zhijiang
--
From:Senthil Kumar 
Send Time:2020年5月13日(星期三) 00:49
To:user@flink.apache.org 
Subject:Re: Flink Streaming Job Tuning help

I forgot to mention, we are consuming said records from AWS kinesis and writing 
out to S3.

From: Senthil Kumar 
Date: Tuesday, May 12, 2020 at 10:47 AM
To: "user@flink.apache.org" 
Subject: Flink Streaming Job Tuning help

Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar



flink1.10 ddl metric 不显示

2020-05-13 Thread 了不起的盖茨比
ddl任务,将sql写在代码里面提交运行,web不显示任何指标数据,比如接收了多少数据等。是我哪没有进行配置吗?job正常运行结果的。

回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

2020-05-13 Thread 了不起的盖茨比
log展示超时,如果确认不是超时问题,再次查看taskmanager的log,是否有内存溢出导致无法连接es问题,或者插入时候有bug,导致了不断新类产生,metaspace溢出。





-- 原始邮件 --
发件人: aven.wu https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
另外,es写入失败可以自定义一个 ActionRequestFailureHandler,你可以加入失败队列或者重试等等。
希望可以帮助到你。

Best
Aven

发件人: Yangze Guo
发送时间: 2020年5月13日 16:21
收件人: user-zh@flink.apache.org
主题: Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout 
[3]

您好,请问您的日志中有没有如语句

- Failed Elasticsearch bulk request:
- Failed Elasticsearch item request:
如果有,可以提供一下

从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1]

esSinkBuilder.setRestClientFactory(
 restClientBuilder - {
 restClientBuilder.setDefaultHeaders(...)
 restClientBuilder.setMaxRetryTimeoutMillis(...)
 restClientBuilder.setPathPrefix(...)
 restClientBuilder.setHttpClientConfigCallback(...)
 }
);

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink


Best,
Yangze Guo

On Wed, May 13, 2020 at 2:53 PM Jim Chen 

Re: Flink restart strategy on specific exception

2020-05-13 Thread Till Rohrmann
Yes, you are right Zhu Zhu. Extending
the RestartBackoffTimeStrategyFactoryLoader to also load custom
RestartBackoffTimeStrategies sound like a good improvement for the future.

@Ken Krugler , the old RestartStrategy
interface did not provide the cause of the failure, unfortunately.

Cheers,
Till

On Wed, May 13, 2020 at 7:55 AM Zhu Zhu  wrote:

> Hi Ken,
>
> Custom restart-strategy was an experimental feature and was deprecated
> since 1.10. [1]
> That's why you cannot find any documentation for it.
>
> The old RestartStrategy was deprecated and replaced by
> RestartBackoffTimeStrategy since 1.10
> (unless you are using the legacy scheduler which was also deprecated).
> The new restart strategy, RestartBackoffTimeStrategy, will be able to know
> the exact failure cause.
> However, the new restart strategy does not support customization at the
> moment.
> Your requirement sounds reasonable to me and I think custom (new) restart
> strategy can be something to support later.
>
> @Till Rohrmann  @Gary Yao  what do
> you think?
>
> [1]
> https://lists.apache.org/thread.html/6ed95eb6a91168dba09901e158bc1b6f4b08f1e176db4641f79de765%40%3Cdev.flink.apache.org%3E
>
> Thanks,
> Zhu Zhu
>
> Ken Krugler  于2020年5月13日周三 上午7:34写道:
>
>> Hi Til,
>>
>> Sorry, missed the key question…in the RestartStrategy.restart() method, I
>> don’t see any good way to get at the underlying exception.
>>
>> I can cast the RestartCallback to an ExecutionGraphRestartCallback, but I
>> still need access to the private execGraph to be able to get at the failure
>> info. Is there some other way in the restart handler to get at this?
>>
>> And yes, I meant to note you’d mentioned the required static method in
>> your email, I was asking about documentation for it.
>>
>> Thanks,
>>
>> — Ken
>>
>> ===
>> Sorry to resurface an ancient question, but is there a working example
>> anywhere of setting a custom restart strategy?
>>
>> Asking because I’ve been wandering through the Flink 1.9 code base for a
>> while, and the restart strategy implementation is…pretty tangled.
>>
>> From what I’ve been able to figure out, you have to provide a factory
>> class, something like this:
>>
>> Configuration config = new Configuration();
>> config.setString(ConfigConstants.RESTART_STRATEGY,
>> MyRestartStrategyFactory.class.getCanonicalName());
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment(4, config);
>>
>> That factory class should extend RestartStrategyFactory, but it also
>> needs to implement a static method that looks like:
>>
>> public static MyRestartStrategyFactory
>> createFactory(Configuration config) {
>> return new MyRestartStrategyFactory();
>> }
>>
>> I wasn’t able to find any documentation that mentioned this particular
>> method being a requirement.
>>
>> And also the documentation at
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
>>  doesn’t
>> mention you can set a custom class name for the restart-strategy.
>>
>> Thanks,
>>
>> — Ken
>>
>>
>> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  wrote:
>>
>> Hi Kasif,
>>
>> I think in this situation it is best if you defined your own custom
>> RestartStrategy by specifying a class which has a `RestartStrategyFactory
>> createFactory(Configuration configuration)` method as `restart-strategy:
>> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
>>
>> Cheers,
>> Till
>>
>> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> Looking at existing restart strategies they are kind of generic. We have
>>> a requirement to restart the job only in case of specific exception/issues.
>>>
>>> What would be the best way to have a re start strategy which is based on
>>> few rules like looking at particular type of exception or some extra
>>> condition checks which are application specific.?
>>>
>>>
>>>
>>> Just a background on one specific issue which invoked this requirement
>>> is slots not getting released when the job finishes. In our applications,
>>> we keep track of jobs submitted with the amount of parallelism allotted to
>>> it.  Once the job finishes we assume that the slots are free and try to
>>> submit next set of jobs which at times fail with error  “not enough slots
>>> available”.
>>>
>>>
>>>
>>> So we think a job re start can solve this issue but we only want to re
>>> start only if this particular situation is encountered.
>>>
>>>
>>>
>>> Please let us know If there are better ways to solve this problem other
>>> than re start strategy.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Kasif
>>>
>>>
>>>
>>> --
>>>
>>> Your Personal Data: We may collect and process information about you
>>> that may be subject to data protection laws. For more information about how
>>> we use and disclose your personal data, how we protect your information,
>>> our legal basis to use your 

Re: Incremental state with purging

2020-05-13 Thread Yun Tang
Hi

>From your description: "output the state every y seconds and remove old 
>elements", I think TTL [1] is the proper solution for your scenario. And you 
>could define the ttl of your state as y seconds so that processfunction could 
>only print elements in the last y seconds.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Best
Yun Tang

From: Annemarie Burger 
Sent: Wednesday, May 13, 2020 2:46
To: user@flink.apache.org 
Subject: Incremental state with purging

Hi,

I'm trying to implement the most efficient way to incrementally put incoming
DataStream elements in my (map)state, while removing old elements (older
that x) from that same state. I then want to output the state every y
seconds. I've looked into using the ProcessFunction with onTimer, or
building my own Trigger for a window function, but I struggle with putting
all this together in a logical and efficient way. Since the state is very
big I don't want to duplicate it over multiple (sliding)windows. Does
anybody know the best way to achieve this? Some pseudo code would be very
helpful.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

2020-05-13 Thread Yun Tang
Hi

>From our experience, instead of offering more resource for Prometheus 
>push-gateway and servers. We could leverage Flink' feature to avoid sending 
>unnecessary data (especially high-dimension tags, e,g task_attempt_id) after 
>Flink-1.10. In general, we could exclude 
>"operator_id;task_id;task_attempt_id", which are rarely used, in 
>metrics.reporter..scope.variables.excludes.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#reporter

Best
Yun Tang

From: Thomas Huang 
Sent: Wednesday, May 13, 2020 12:00
To: 李佳宸 ; user@flink.apache.org 
Subject: Re: Prometheus Pushgateway Reporter Can not DELETE metrics on 
pushgateway

I met this issue three months ago. Finally, we got the conclusion that is 
Prometheus push gateway can not handle high throughout metric data. But we 
solved the issue via service discovery. We changed the Prometheus metric 
reporter code, adding the registration logic, so the job can expose the host 
and port on discovery service. And then write a plugin for Prometheus that can 
get the service list to pull the metrics from the Flink jobs.


From: 李佳宸 
Sent: Wednesday, May 13, 2020 11:26:26 AM
To: user@flink.apache.org 
Subject: Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

Hi,

I got stuck in using Prometheus,Pushgateway to collect metrics. Here is my 
configuration about reporter:

metrics.reporter.promgateway.class: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: true

And the version information:
Flink 1.9.1
Prometheus 2.18
PushGateway 1.2 & 0.9 (I had already try them both)

I found that when Flink cluster restart, there showed up metrics which have new 
jobName with random suffix. But there still existed those metrics having 
jobName before restarting cluster(value stop update). Since Prometheus still 
periodically pulled the data in pushgateway, I got a bunch of time series data 
with value unchanged forever.

It looks like:


# HELP flink_jobmanager_Status_JVM_CPU_Load Load (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Load gauge
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0.0006602344673593189
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 4.54512e+09
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 8.24809e+09
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded ClassesLoaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 5984
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 6014
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded ClassesUnloaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0

Ps: This cluster has one JobManager.

In my understanding, when I set metrics.reporter.promgateway.deleteOnShutdown 
to true, the old metrics information should be deleted from pushgateway. But it 
didn’t work somehow.
Is my understanding on these configuration right? Any solution about deleting 
metrics from pushgateway?

Thanks!


Re: Register time attribute while converting a DataStream to Table

2020-05-13 Thread Dawid Wysakowicz
Hi,

Unfortunately support for consuming upsert stream is not supported yet.
It's not as easy as adding the type information there as you suggested.
Even if you do that it will still be considered to be an append message
internally by the planner. There is an ongoing effort (FLIP-95[1]) to
support it in Flink 1.11.


Best,

Dawid


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces


On 13/05/2020 01:03, Jiahui Jiang wrote:
> Hello Flink friends, I have a retract stream in the format of
> 'DataStream' that I want to register into my table environment,
> and also expose processing time column in the table.
>
> For a regular datastream, I have being
> doing 'tableEnvironment.createTemporaryView(path, dataStream,
> 'field1,field2, ..,__processing_time_column.proctime')'. with no
> issue. But for this retract stream, I was getting an error
> "org.apache.flink.table.api.ValidationException: Too many fields
> referenced from an atomic type."
>
> Digging a little bit deeper, in TypeInfoUtils#extractFieldInformation
> ,
> it doesn't handle CRowTypeInfo as a known case. Looking at the
> behavior of 
>
> Since it's a standard CompositeType, instead of only handling
> 'if (inputType instanceof PojoTypeInfo)', can we just add CRowTypeInfo
> here too? Is there any risk that I'm not aware of?
>
> Thank you!


signature.asc
Description: OpenPGP digital signature


Re: Statefun 2.0 questions

2020-05-13 Thread Wouter Zorgdrager
Dear Igal, all,

Thanks a lot. This is very helpful. I understand the architecture a bit
more now. We can just scale the stateful functions and put a load balancer
in front and Flink will contact them. The only part of the scaling I don't
understand yet is how to scale the 'Flink side'. So If I understand
correctly the Kafka ingress/egress parts runs on the Flink cluster and
contacts the remote workers through HTTP. How can I scale this Kafka part
then? For a normal Flink job I would just change the parallelism, but I
couldn't really find that option yet. Is there some value I need to set in
the module.yaml.

Once again, thanks for the help so far. It has been useful.

Regards,
Wouter

Op wo 13 mei 2020 om 00:03 schreef Igal Shilman :

> Hi Wouter,
>
> Triggering a stateful function from a frontend indeed requires an ingress
> between them, so the way you've approached this is also the way we were
> thinking of.
> As Gordon mentioned a potential improvement might be an HTTP ingress, that
> would allow triggering stateful functions directly from the front end
> servers.
> But this kind of ingress is not implemented yet.
>
> Regarding scaling: Your understanding is correct, you can scale both the
> Flink cluster and the remote "python-stateful-function" cluster
> independently.
> Scaling the Flink cluster, tho, requires taking a savepoint, bumping the
> job parallelism, and starting the cluster with more workers from the
> savepoint taken previously.
>
> Scaling "python-stateful-function" workers can be done transparently to
> the Flink cluster, but the exact details are deployment specific.
> - For example the python workers are a k8s service.
> - Or the python workers are deployed behind a load balancer
> - Or you add new entries to the DNS record of your python worker.
>
> I didn't understand "ensuring that it ends op in the correct Flink job"
> can you please clarify?
> Flink would be the one contacting the remote workers and not the other way
> around. So as long as the new instances
> are visible to Flink they would be reached with the same shared state.
>
> I'd recommend watching [1] and the demo at the end, and [2] for a demo
> using stateful functions on AWS lambda.
>
> [1] https://youtu.be/NF0hXZfUyqE
> [2] https://www.youtube.com/watch?v=tuSylBadNSo
>
> It seems like you are on the correct path!
> Good luck!
> Igal.
>
>
> On Tue, May 12, 2020 at 11:18 PM Wouter Zorgdrager 
> wrote:
>
>> Hi Igal, all,
>>
>> In the meantime we found a way to serve Flink stateful functions in a
>> frontend. We decided to add another (set of) Flask application(s) which
>> link to Kafka topics. These Kafka topics then serve as ingress and egress
>> for the statefun cluster. However, we're wondering how we can scale this
>> cluster. On the documentation page some nice figures are provided for
>> different setups but no implementation details are given. In our case we
>> are using a remote cluster so we have a Docker instance containing the
>> `python-stateful-function` and of course the Flink cluster containing a
>> `master` and `worker`. If I understood correctly, in a remote setting, we
>> can scale both the Flink cluster and the `python-stateful-function`.
>> Scaling the Flink cluster is trivial because I can add just more
>> workers/task-managers (providing more taskslots) just by scaling the worker
>> instance. However, how can I scale the stateful function also ensuring that
>> it ends op in the correct Flink job (because we need shared state there). I
>> tried scaling the Docker instance as well but that didn't seem to work.
>>
>> Hope you can give me some leads there.
>> Thanks in advance!
>>
>> Kind regards,
>> Wouter
>>
>> Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager <
>> zorgdrag...@gmail.com>:
>>
>>> Hi Igal,
>>>
>>> Thanks for your quick reply. Getting back to point 2, I was wondering if
>>> you could trigger indeed a stateful function directly from Flask and also
>>> get the reply there instead of using Kafka in between. We want to
>>> experiment running stateful functions behind a front-end (which should be
>>> able to trigger a function), but we're a bit afraid that using Kafka
>>> doesn't scale well if on the frontend side a user has to consume all Kafka
>>> messages to find the correct reply/output for a certain request/input. Any
>>> thoughts?
>>>
>>> Thanks in advance,
>>> Wouter
>>>
>>> Op do 7 mei 2020 om 10:51 schreef Igal Shilman :
>>>
 Hi Wouter!

 Glad to read that you are using Flink for quite some time, and also
 exploring with StateFun!

 1) yes it is correct and you can follow the Dockerhub contribution PR
 at [1]

 2) I’m not sure I understand what do you mean by trigger from the
 browser.
 If you mean, for testing / illustration purposes triggering the
 function independently of StateFun, you would need to write some JavaScript
 and preform the POST (assuming CORS are enabled)
 Let me know if you’d like getting further 

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-13 Thread Theo Diefenthal
Hi Guowei, 

Impala is a database that can execute fast SQL Queries on parquet data. It has 
its own small metadata store for the parquet-tables created in there. In that 
store, it remembers the .parquet files in each partition and also stores some 
statistics like number of rows and so on. 

If I have a new .parquet file in my partition (or a new partition), I need to 
tell Impala about it. Otherwise, Impala won't take those files into account for 
its queries. So I execute a query to impala like 
ALTER TABLE MYDATA ADD IF NOT EXISTS PARTITION (partitionkey= \" 
20200513T10 ") 
Impala will add this partition and/or scan it for new .parquet-files and update 
its metastore. This can be run more generically like 
ALTER TABLE MYDATA RECOVER PARTITIONS 
and/or 
REFRESH MYDATA 
But those queries take more time to execute. Furthermore, I want to update the 
table statistics as well (Prior stuff just told impala about new .parquet 
files). I run a query like this 
COMPUTE INCREMENTAL STATS MYDATA PARTITION (partitionkey= \" 20200513T10 ") 
This query can run for a rather long time, speaking about a few minutes for 
reasonable tables of few terabytes in size. I can leave the partitionkey stuff 
away and perform the query over the entire table, but then, it will take even 
more time to be computed. (Note that I think of optimizing my solution a bit 
and issue the Incremental Stats statement with the watermark in a later 
version. The stats are not required for an impala table, it just helps its 
planner for table with joins on how to build the execution plan. So I can wait 
with updating the stats up until the watermark passes and perform this query 
really only once) 

Furthermore, in our prod enviornment, we are not allowed to have too many 
simultaneous connections to Impala. The duration of the SQL statements and the 
requirement that we don't have too many connections led us to have those impala 
updates in a parallelism 1 task. Usually, all our task managers write data to 
the same partition (Streaming events from "now" and partitioned tables on an 
hourly basis). So there is no need that each taskmanager tells impala to update 
the very same partition multiple times. In my parallelism 1 task, I see that 
they all worked on the same partition and submit one query to impala to update 
this partition. 

Having a notifier sounds great, if it can be executed with parallelism 1 for 
all sink tasks.. 

Best regards 
Theo 


Von: "Guowei Ma"  
An: "Theo Diefenthal"  
CC: "user" , "yungao gy"  
Gesendet: Mittwoch, 13. Mai 2020 09:15:37 
Betreff: Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch) 

Hi, Theo 
Thank you for sharing your solution. 
>From your description, it seems that what you need is a listener that could 
>notify the state change of the partition/bucket: created/updated/closed. 
>(maybe you don't need the close notify). 
I am not familiar with Impala. So what I want to know is why you need to be 
notified when the partition got new data every time. Would you like to give 
some detailed descriptions? 

Best, 
Guowei 


Theo Diefenthal < [ mailto:theo.diefent...@scoop-software.de | 
theo.diefent...@scoop-software.de ] > 于2020年5月13日周三 上午12:00写道: 



Hi Yun, 

For me, that sounds quite nice. I implemented the same for my application a few 
weeks ago, but of course tailored only to my app. 
What I did: 
1. I wrapped the Parquet-StreamingFileSink into a Process-Function. 
2. I extended the default ProcessOperator and instead of 
"notifyCheckpointComplete(long checkpointId)", I provided my 
WrappedProcessFunction a "notifyCheckpointComplete(checkointId, 
lastCommitWatermark)". 
3. I added a custom sink with parallelism 1 behind the WrappedProcessFunction. 
4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a 
message downstream to the parallelism 1 sink containing data about which 
partitions were written to between in the phase to the last checkpoint. 
5. In the parallelism 1 sink, I make sure that I get the messages from all 
upstream task (Give the constructor an int parameter telling it the parallelism 
of the WrappedProcessFunction) and then perform my parallelism 1 operation, in 
my case, telling Impala which partitions were added or got new data. Luckily, 
in case of Impala, that operation can be made idempotent so I only needed to 
make sure that I have an at least once processing from the state perspective 
here. 

I had to go for notifyCheckpointComplete as only there, the parquet files are 
ultimately committed and thus available for spark, impala and so on. 

So if you go on with that issue, I'd be really happy to be able to customize 
the solution and e.g. get rid of my custom setup by only specifiying kind of a 
lambda function which should be run with parallelism 1 and update impala. That 
function would however still need the info which partitions were updated/added. 
And in my case, I was not really interested in the watermark (I sent it 
downstream only 

回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

2020-05-13 Thread aven . wu
Hi 
根据你的情况,flink 写入ES 超时,一般是ES吞吐不足造成的,可以看一下官方的建议
https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
另外,es写入失败可以自定义一个 ActionRequestFailureHandler,你可以加入失败队列或者重试等等。
希望可以帮助到你。

Best
Aven

发件人: Yangze Guo
发送时间: 2020年5月13日 16:21
收件人: user-zh@flink.apache.org
主题: Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout 
[3]

您好,请问您的日志中有没有如语句

- Failed Elasticsearch bulk request:
- Failed Elasticsearch item request:
如果有,可以提供一下

从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1]

esSinkBuilder.setRestClientFactory(
  restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
  }
);

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink


Best,
Yangze Guo

On Wed, May 13, 2020 at 2:53 PM Jim Chen  wrote:
>
> 大家好,
>
> 我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
> [3],报错信息如下:
> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> 

Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

2020-05-13 Thread Yangze Guo
您好,请问您的日志中有没有如语句

- Failed Elasticsearch bulk request:
- Failed Elasticsearch item request:
如果有,可以提供一下

从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1]

esSinkBuilder.setRestClientFactory(
  restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
  }
);

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink


Best,
Yangze Guo

On Wed, May 13, 2020 at 2:53 PM Jim Chen  wrote:
>
> 大家好,
>
> 我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
> [3],报错信息如下:
> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> 

回复: Re:flink10读取kafka报错

2020-05-13 Thread pengchenglin
昨晚解决了,暂时还没查清原因,解决方式把createTemporaryTable 方法换成之前过时的那个方法registerTableSource,别的不用动。



pengchengl...@163.com
 
发件人: PCL
发送时间: 2020-05-12 23:39
收件人: user-zh
主题: Re:回复:flink10读取kafka报错
 
 
 
感谢回复!
这个很神奇的是,执行sqlquery就没问题
/*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime, 
INTERVAL '5' SECOND)" +
" from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) ");
tb1.printSchema();*/
放开注释后,打印的schema是
root
|-- EXPR$0: DOUBLE
|-- EXPR$1: TIMESTAMP(3)
 
 
 
 
 
 
 
 
 
 
 
在 2020-05-12 22:36:17,"忝忝向��" <153488...@qq.com> 写道:
>TIMESTAMP(3)时间格式不对吧
>
>
>--原始邮件--
>发件人:"PCL"发送时间:2020年5月12日(星期二) 晚上9:43
>收件人:"user-zh"
>主题:flink10读取kafka报错
>
>
>
>各位大佬:
>有没有遇到过这个问题,Window aggregate can only be defined over a time attribute 
>column, but TIMESTAMP(3) encountered.
>无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。
>版本如下:
>代码如下:
>//获取运行环境
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings = 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>//EnvironmentSettings settings = 
>EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
> //创建一个tableEnvironment
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>
>Schema schema = new Schema()
>//.field("id", "VARCHAR").from("id")
>.field("id", "STRING")
>//.field("name", "VARCHAR")
>.field("amount", "DOUBLE")
>
> .field("proctime", Types.SQL_TIMESTAMP).proctime()
>//.field("rowtime", Types.SQL_TIMESTAMP)
>
> //.rowtime(
>
> // new Rowtime()
>
> 
>//
> .timestampsFromField(
>
> 
>//
> "eventtime")
>
> 
>//
> .watermarksPeriodicBounded(2000))
>;
>
>// "0.8", "0.9", "0.10", "0.11", and "universal"
>tableEnv.connect(new Kafka().version("universal")
>
> .topic("source0511")
>
> .property("zookeeper.connect", "172.16.44.28:7758")
>
> .property("bootstrap.servers", "172.16.44.28:9096")
>
> .property("group.id", "source0511-group")
>
> .startFromEarliest()
>
> )
>
> .withFormat(new Csv())
>
> .withSchema(schema)
>
> .inAppendMode()
>
> .createTemporaryTable("sourceTable");
>
>tableEnv.connect(
>new Kafka()
>
> .version("universal")
>// "0.8", "0.9", "0.10", "0.11", and "universal"
>.topic("sink0511")
>
> .property("acks", "all")
>
> .property("retries", "0")
>
> .property("batch.size", "16384")
>
> .property("linger.ms", "10")
>
> .property("zookeeper.connect", "172.16.44.28:7758")
>
> .property("bootstrap.servers", "172.16.44.28:9096")
>
> .sinkPartitionerFixed())
>
> .inAppendMode()
>
> .withFormat(new Json())
>
> .withSchema(
>new Schema().field("totalamount", "DOUBLE")
>//.field("total", "INT")
>.field("time", Types.SQL_TIMESTAMP)
>
> )
>
> .createTemporaryTable("sinkTable");
>
>tableEnv.sqlUpdate("insert into sinkTable"
>+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) "
>+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)");
>//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT 
>user_name)
> // FROM user_actions
> // GROUP BY TUMBLE(user_action_time, 
>INTERVAL '10' MINUTE);
>env.execute("test");


Re: Flink BLOB server port exposed externally

2020-05-13 Thread Dawid Wysakowicz
Hi Omar,

Theoretically I think it could be possible to change the address on
which the BlobServer runs (even to localhost). There is no configuration
option for it now and the BlobServer always binds to the wildcard. One
important aspect to consider here is that the BlobServer must be
accessible from all the components of the cluster: taskmanagers,
jobmanager (if I am not mistaken).

@Arvid  Wouldn't changing the line 192 in BlobServer:

        this.serverSocket = NetUtils.createSocketFromPorts(ports,
                (port) -> socketFactory.createServerSocket(port,
finalBacklog));

to e.g.

        this.serverSocket = NetUtils.createSocketFromPorts(ports,
              (port) -> socketFactory.createServerSocket(port,
finalBacklog, InetAddress.getByName(configuration.get(BLOB_HOSTNAME;

do the trick?

That said I think for now your only option is what Arvid suggested.
Remember though that by default BlobServer is exposed on os chosen port,
so it might change if you restart your cluster. You can set a
staticport/range with 'blob.server.port' configuration option. If you
feel strong about the requirement to configure the host as well, feel
free to open a jira ticket.

On 12/05/2020 13:34, Arvid Heise wrote:
> Hi Omar,
>
> wouldn't it be possible to just create an iptable rule that allows
> access to 1098 only from localhost? I don't think you can open a
> socket just for localhost programmatically (at least not from Java).
>
> Best,
>
> Arvid
>
> On Tue, May 12, 2020 at 12:51 PM Omar Gawi  > wrote:
>
> Hi All,
>
> I have Apache Flink running as part of our java program , on a
> linux machine.
> The Flink runs on thread(s) within the same java process.
> I see that the machine has the BLOB server port 1098 exposed to
> the outside :
>
> davc@sdavc:~$ netstat -anp | grep LISTEN
>
> (Not all processes could be identified, non-owned process info
>
> will not be shown, you would have to be root to see it all.)
>
> tcp    0  0 0.0.0.0:22  
> 0.0.0.0:*   LISTEN  -
>
> tcp    0  0 127.0.0.1:5432
>   0.0.0.0:*  
> LISTEN  311/postgres
>
> tcp6   0  0 :::8080
> :::*    LISTEN  -
>
> tcp6       0  0 :::21  
> :::*    LISTEN  -
>
> tcp6   0  0 :::22  
> :::*    LISTEN  -
>
> tcp6   0  0 ::1:5432   
> :::*    LISTEN  311/postgres
>
> tcp6   0  0 :::8443
> :::*    LISTEN  -
>
> *tcp6   0  0 :::1098
> :::*    LISTEN  -* 
>    
>
> This bring to our team security concerns , when other external
> user/system open connection (for telnet or other protocols) to
> this port (accidentally or not), we get below error in the java
> app log:
>
> 2020-04-23 07:54:58 ERROR BlobServerConnection:131 - Error while
> executing BLOB connection.
>
> java.io.IOException: Unknown operation 3
>
>    at
> 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:122)
>
>
> My question if is there a way to avoid exposing this port  to the
> outside, and keep it available only for it's original purpose :
> serving the localhost/127.0.0.1  requests which
> come from the flink engine.
>
>
> Thank you and stay safe.
>
> Omar
>
>
>
> -- 
>
> Arvid Heise| Senior Java Developer
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward - The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
> BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji (Toni) Cheng   


signature.asc
Description: OpenPGP digital signature


Re: Flink consuming rate increases slowly

2020-05-13 Thread Dawid Wysakowicz
Hi Eyal,

Could you explain your job a bit more? Did you increase the parallelism
of your job? What does it do? Does it perform any time based operations?
How do you measure the processing rate?

Best,

Dawid

On 10/05/2020 21:18, Chen Qin wrote:
> Hi Eyal,
>
> It’s unclear what warmup phase does in your use cases. Usually we see
> Flink start consume at high rate and drop to a point downstream can
> handle.
>
> Thanks
> Chen
>
>> On May 10, 2020, at 12:25 AM, Eyal Pe'er > > wrote:
>>
>> Hi all,
>> Lately I've added more resources to my Flink cluster which required a
>> restart of all apps.
>> From the cluster side, the only change I made, is to add more task slots.
>> On the cluster I have a streaming app that consumes from Kafka and
>> sinks to files.
>> I noticed that since the restart, the applications "warmup" has
>> impacted dramatically.
>> Before, the change it took few minutes for the app to start and
>> consume normally (from my point of view, normally is a stable rate) -
>> from 0 to 16K events per second in 4 minutes.
>> Now, after the change, it takes hours till it stabilizes on the
>> normal processing rate- from 0 to 12K events per second in 3 hours.
>> The data source behavior hasn’t changed (same incoming rate,
>> partitions, servers etc.).
>> I am aware to the backpressure mechanism in Flink, but it seems like
>> it works too slow here.
>> Is there a way to speed or control it? 
>>  
>> Thanks a lot
>> Eyal Peer
>


signature.asc
Description: OpenPGP digital signature


Re: Flink Metrics in kubernetes

2020-05-13 Thread Averell
Hi Gary,

Sorry for the false alarm. It's caused by a bug in my deployment - no
metrics were added into the registry.
Sorry for wasting your time.

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-13 Thread Guowei Ma
Hi, Theo
Thank you for sharing your solution.
>From your description, it seems that what you need is a listener that could
notify the state change of the partition/bucket: created/updated/closed.
(maybe you don't need the close notify).
I am not familiar with Impala. So what I want to know is why you need to be
notified when the partition got new data every time. Would you like to give
some detailed descriptions?

Best,
Guowei


Theo Diefenthal  于2020年5月13日周三 上午12:00写道:

> Hi Yun,
>
> For me, that sounds quite nice. I implemented the same for my application
> a few weeks ago, but of course tailored only to my app.
> What I did:
> 1. I wrapped the Parquet-StreamingFileSink into a Process-Function.
> 2. I extended the default ProcessOperator and instead of
> "notifyCheckpointComplete(long checkpointId)", I provided my
> WrappedProcessFunction a "notifyCheckpointComplete(checkointId,
> lastCommitWatermark)".
> 3. I added a custom sink with parallelism 1 behind the
> WrappedProcessFunction.
> 4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a
> message downstream to the parallelism 1 sink containing data about which
> partitions were written to between in the phase to the last checkpoint.
> 5. In the parallelism 1 sink, I make sure that I get the messages from all
> upstream task (Give the constructor an int parameter telling it the
> parallelism of the WrappedProcessFunction) and then perform my parallelism
> 1 operation, in my case, telling Impala which partitions were added or got
> new data. Luckily, in case of Impala, that operation can be made idempotent
> so I only needed to make sure that I have an at least once processing from
> the state perspective here.
>
> I had to go for notifyCheckpointComplete as only there, the parquet files
> are ultimately committed and thus available for spark, impala and so on.
>
> So if you go on with that issue, I'd be really happy to be able to
> customize the solution and e.g. get rid of my custom setup by only
> specifiying kind of a lambda function which should be run with parallelism
> 1 and update impala. That function would however still need the info which
> partitions were updated/added.
> And in my case, I was not really interested in the watermark (I sent it
> downstream only for metric purposes) but want to tell impala after each
> commit which partitions changed, regardless of the value from the watermark.
>
> Best regards
> Theo
>
> --
> *Von: *"Yun Gao" 
> *An: *"Robert Metzger" , "Jingsong Li" <
> jingsongl...@gmail.com>
> *CC: *"Peter Groesbeck" , "user" <
> user@flink.apache.org>
> *Gesendet: *Dienstag, 12. Mai 2020 10:36:59
> *Betreff: *回复:Re: Writing _SUCCESS Files (Streaming and Batch)
>
> Hi Peter,
>
> Sorry for missing the question and response later, I'm currently
> sworking together with Jingsong on the issue to support "global committing"
> (like writing _SUCCESS file or adding partitions to hive store) after
> buckets terminated. In 1.11 we may first support watermark/time related
> buckets in Table/SQL API, and we are also thinking of supporting "global
> committing" for arbitrary bucket assigner policy for StreamingFileSink
> users. The current rough thought is to let users specify when a bucket is
> terminated on a single task, and the OperatorCoordinator[1] of the sink
> will aggreate the information from all subtasks about this bucket and do
> the global committing if the bucket has been finished on all the subtasks,
> but this is still under thinking and discussion. Any thoughts or
> requirements on this issue are warmly welcome.
>
> Best,
>  Yun
>
>
> [1] OperatorCoordinator is introduced in FLIP-27:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
> This is a component resides in JobManager and could communicate with all
> the subtasks of the corresponding operator, thus it could be used to
> aggregate status from subtasks.
>
> --原始邮件 --
> *发件人:*Robert Metzger 
> *发送时间:*Tue May 12 15:36:26 2020
> *收件人:*Jingsong Li 
> *抄送:*Peter Groesbeck , user <
> user@flink.apache.org>
> *主题:*Re: Writing _SUCCESS Files (Streaming and Batch)
>
>> Hi Peter,
>> I filed a ticket for this feature request:
>> https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your
>> thoughts / requirements to the ticket)
>>
>> Best,
>> Robert
>>
>>
>> On Wed, May 6, 2020 at 3:41 AM Jingsong Li 
>> wrote:
>>
>>> Hi Peter,
>>> The troublesome is how to know the "ending" for a bucket in streaming
>>> job.
>>> In 1.11, we are trying to implement a watermark-related bucket ending
>>> mechanism[1] in Table/SQL.
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <
>>> peter.groesb...@gmail.com> wrote:
>>>
 I am replacing an M/R job with a Streaming job using the
 StreamingFileSink and 

Re: Need suggestion on Flink-Kafka stream processing design

2020-05-13 Thread Arvid Heise
Hi Hemant,

what you described is an aggregation. You aggregate 15 small records into
one large record. The concept of aggregation goes beyond pure numeric
operations; for example, forming one large string with CONCAT is also a
type of aggregation.

In your case, I'd still follow my general outlined approach. You have two
options:
* Use an infinite window that fires at each new element. You need to ensure
that old metrics are evicted [1].
* You can also implement a KeyedProcessFunction [2] and manage the state
directly. It might be initially a bit more complex (=more code), but it
should easier to understand and maintain.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html#evictors
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html



On Wed, May 13, 2020 at 12:06 AM hemant singh  wrote:

> Hi Arvid,
>
> I don't want to aggregate all events, rather I want to create a record for
> a device combining data from multiple events. Each of this event gives me a
> metric for a device, so for example if I want one record for device-id=1
> the metric will look like metric1, metric2, metric3  where metric1
> comes from a event1, metric2 from event2 and likewise
> From each event get latest data to form a kind of snapshot of device
> performance across the metrics.
>
> Thanks,
> Hemant
>
> On Wed, May 13, 2020 at 1:38 AM Arvid Heise  wrote:
>
>> Hi Hemant,
>>
>> In general, you want to keep all data coming from one device in one Kafka
>> partition, such that the timestamps of that device are monotonically
>> increasing. Thus, when processing data from one device, you have ensured
>> that no out-of-order events with respect to this device happen.
>>
>> If you now want to aggregate all events of a given timestamp for a
>> device, it is a matter of keying by device id and applying a custom window.
>> There is no need for joins.
>>
>> On Tue, May 12, 2020 at 9:05 PM hemant singh 
>> wrote:
>>
>>> Hello Flink Users,
>>>
>>> Any views on this question of mine.
>>>
>>> Thanks,
>>> Hemant
>>>
>>> On Tue, May 12, 2020 at 7:00 PM hemant singh 
>>> wrote:
>>>
 Hello Roman,

 Thanks for your response.

 I think partitioning you described (event type + protocol type) is
 subject to data skew. Including a device ID should solve this problem.
 Also, including "protocol_type" into the key and having topic per
 protocol_type seems redundant.
 Each protocol is in single topic and event_type is key to distribute
 data to a specific partition.

 Furthermore, do you have any particular reason to maintain multiple
 topics?
 I could imagine protocols have different speeds or other
 characteristics, so you can tune Flink accordingly.
 Otherwise, having a single topic partitioned only by device ID would
 simplify deployment and reduce data skew.
 Yes, you are right. These protocols have separate characteristics like
 speed, data format. If I do have only one topic with data partitioned by
 device_id then it could be that events from faster protocol is processed
 faster and the joins which I want to do will not have enough matching data.
 I have a question here how are you referring to tune Flink to handle
 different characteristics like speed of streams as reading from kafka could
 result in uneven processing of data?

 > By consume do you mean the downstream system?
 My downstream is TSDB and other DBs where the data will be written to.
 All these is time-series data.

 Thanks,
 Hemant



 On Tue, May 12, 2020 at 5:28 PM Khachatryan Roman <
 khachatryan.ro...@gmail.com> wrote:

> Hello Hemant,
>
> Thanks for your reply.
>
> I think partitioning you described (event type + protocol type) is
> subject to data skew. Including a device ID should solve this problem.
> Also, including "protocol_type" into the key and having topic per
> protocol_type seems redundant.
>
> Furthermore, do you have any particular reason to maintain multiple
> topics?
> I could imagine protocols have different speeds or other
> characteristics, so you can tune Flink accordingly.
> Otherwise, having a single topic partitioned only by device ID would
> simplify deployment and reduce data skew.
>
> > By consume do you mean the downstream system?
> Yes.
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 11:30 PM hemant singh 
> wrote:
>
>> Hello Roman,
>>
>> PFB my response -
>>
>> As I understand, each protocol has a distinct set of event types
>> (where event type == metrics type); and a distinct set of devices. Is 
>> this
>> correct?
>> Yes, correct. distinct events and devices. Each device emits these
>> event.
>>
>> > Based on data protocol I have 4-5 topics. 

在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

2020-05-13 Thread Jim Chen
大家好,

我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
[3],报错信息如下:
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at